diff --git a/.eslintrc b/.eslintrc new file mode 100644 index 0000000..6ca3df1 --- /dev/null +++ b/.eslintrc @@ -0,0 +1,8 @@ +{ + "extends": [ + "standard" + ], + "env": { + "node": true + } +} diff --git a/.travis.yml b/.travis.yml index cb7de2a..d15608f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ before_install: before_script: - npm prune - npm run lint + - npm run coverage after_success: - npm run semantic-release branches: diff --git a/README.md b/README.md index 5f29a8f..052557b 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ [![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/) [![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com/) +[![Coverage Status](https://coveralls.io/repos/github/tusharmath/Multi-threaded-downloader/badge.svg)](https://coveralls.io/github/tusharmath/Multi-threaded-downloader) This is a nodejs based module that helps you in performing **resumable**, **multi-threaded** downloads via Http. The module is highly inspired by Speedbit's — [Download Accelerator Plus](http://www.speedbit.com/dap/). diff --git a/package.json b/package.json index 26d5bbe..ea68ae5 100644 --- a/package.json +++ b/package.json @@ -17,36 +17,38 @@ "build": "babel src --out-dir .dist", "test": "ava", "lint": "standard --verbose | snazzy", + "coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls", "semantic-release": "semantic-release pre && npm publish && semantic-release post" }, "engines": { "node": ">= 5.0.0" }, - "main": ".dist/index.js", + "main": ".dist/Main.js", "dependencies": { "graceful-fs": "^4.1.3", "humanize-plus": "^1.8.1", - "immutable": "^3.7.5", - "lodash": "^4.0.0", "meow": "^3.7.0", + "muxer": "^1.0.1", "progress": "^1.1.8", - "reactive-storage": "^3.0.0", + "ramda": "^0.21.0", "request": "^2.60.0", "rx": "^4.0.7", "valid-url": "^1.0.9" }, "license": "MIT", "devDependencies": { - "eslint": "^2.11.1", "ava": "^0.15.0", "babel-cli": "^6.9.0", "babel-plugin-transform-es2015-modules-commonjs": "^6.8.0", "babel-register": "^6.9.0", + "coveralls": "^2.11.9", "cz-conventional-changelog": "^1.1.5", + "eslint": "^2.11.1", "express": "^4.13.1", "ghooks": "^1.0.3", + "nyc": "^6.6.1", "semantic-release": "^4.3.5", - "sinon": "^1.17.2", + "sinon": "^1.17.4", "snazzy": "^4.0.0", "standard": "^7.0.1", "validate-commit-msg": "^2.0.0" diff --git a/perf/utils.js b/perf/TestHelpers.js similarity index 100% rename from perf/utils.js rename to perf/TestHelpers.js diff --git a/src/CreateMTDFile.js b/src/CreateMTDFile.js new file mode 100644 index 0000000..79499c9 --- /dev/null +++ b/src/CreateMTDFile.js @@ -0,0 +1,41 @@ +/** + * Created by tushar.mathur on 29/06/16. + */ + +'use strict' +import {mux} from 'muxer' +import {Observable as O} from 'rx' +import { + RemoteFileSize$, + CreateMeta$, + CreateWriteBufferAtParams, + JSToBuffer$ +} from './Utils' + +export const CreateMTDFile = ({FILE, HTTP, options}) => { + /** + * Create a new file + */ + const fd$ = FILE.open(O.just([options.mtdPath, 'w'])) + + /** + * Retrieve file size on remote server + */ + const size$ = RemoteFileSize$({HTTP, options}) + + /** + * Create initial meta data + */ + const meta$ = CreateMeta$({options, size$}) + + /** + * Create a new file with meta info appended at the end + */ + const written$ = FILE.write(CreateWriteBufferAtParams({ + FILE, + fd$: fd$, + buffer$: JSToBuffer$(meta$), + position$: size$ + })) + return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$}) +} diff --git a/src/DownloadFromMTDFile.js b/src/DownloadFromMTDFile.js new file mode 100644 index 0000000..86c8ae7 --- /dev/null +++ b/src/DownloadFromMTDFile.js @@ -0,0 +1,74 @@ +/** + * Created by tushar.mathur on 29/06/16. + */ + +'use strict' + +import {mux} from 'muxer' +import {Observable as O} from 'rx' +import { + CreateWriteBufferAtParams, + JSToBuffer$, + LocalFileSize$, + MetaPosition$, + ReadJSON$, + demuxFPH, + RequestWithMeta, + WriteBuffer, + SetMetaOffsets, + RxThrottleComplete +} from './Utils' + +export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => { + /** + * Open file to read+append + */ + const fd$ = FILE.open(O.just([mtdPath, 'r+'])) + + /** + * Retrieve File size on disk + */ + const size$ = LocalFileSize$({FILE, fd$}) + + /** + * Retrieve Meta info + */ + const metaPosition$ = MetaPosition$({size$}) + const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$}) + + /** + * Make a HTTP request for each thread + */ + const {response$, buffer$} = demuxFPH( + ['buffer$', 'response$'], RequestWithMeta(HTTP, meta$) + ) + + /** + * Create write params and save buffer+offset to disk + */ + const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$}) + + /** + * Update META info + */ + const nMeta$ = SetMetaOffsets({meta$, bufferWritten$}) + + /** + * Persist META to disk + */ + const metaWritten$ = FILE.write(CreateWriteBufferAtParams({ + fd$, + buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)), + position$: size$ + })) + + /** + * Create sink$ + */ + return mux({ + metaWritten$, response$, + localFileSize$: size$, + fdR$: fd$, metaPosition$, + meta$: O.merge(nMeta$, meta$) + }) +} diff --git a/src/IO.js b/src/IO.js new file mode 100644 index 0000000..74f8b93 --- /dev/null +++ b/src/IO.js @@ -0,0 +1,40 @@ +'use strict' + +import {Observable as O} from 'rx' +import * as Rx from './RxFP' +import {demux} from 'muxer' +import R from 'ramda' +import {Request} from './Request' + +export const fromCB = R.compose(R.apply, O.fromNodeCallback) +export const toOB = cb => R.compose( + Rx.shareReplay(1), + Rx.flatMap(fromCB(cb)) +) +export const FILE = R.curry((fs) => { + return { + // New Methods + open: toOB(fs.open), + fstat: toOB(fs.fstat), + read: toOB(fs.read), + write: toOB(fs.write), + close: toOB(fs.close), + truncate: toOB(fs.truncate), + rename: toOB(fs.rename) + } +}) + +export const HTTP = R.curry((_request) => { + const request = Request(_request) + const requestHead = (params) => { + const [{response$}] = demux(request(params), 'response$') + return response$.first().tap(x => x.destroy()).share() + } + + const select = R.curry((event, request$) => request$.filter(x => x.event === event).pluck('message')) + return { + requestHead, + select, + request + } +}) diff --git a/src/Main.js b/src/Main.js new file mode 100644 index 0000000..1ddd925 --- /dev/null +++ b/src/Main.js @@ -0,0 +1,58 @@ +/** + * Created by tusharmathur on 5/15/15. + */ +'use strict' +import request from 'request' +import fs from 'graceful-fs' +import {Observable as O} from 'rx' +import R from 'ramda' +import * as U from './Utils' +import {CreateMTDFile} from './CreateMTDFile' +import {DownloadFromMTDFile} from './DownloadFromMTDFile' +import * as T from './IO' +import {mux, demux} from 'muxer' + +export const UTILS = U +export const createDownload = (_options) => { + const HTTP = T.HTTP(request) + const FILE = T.FILE(fs) + const options = U.MergeDefaultOptions(_options) + + /** + * Create MTD File + */ + const createMTDFile$ = CreateMTDFile({FILE, HTTP, options}).share() + const [{fdW$}] = demux(createMTDFile$, 'fdW$') + + /** + * Download From MTD File + */ + const downloadFromMTDFile$ = createMTDFile$.last() + .map({HTTP, FILE, mtdPath: options.mtdPath}) + .flatMap(DownloadFromMTDFile) + .share() + + const [{fdR$, meta$, response$}] = demux(downloadFromMTDFile$, 'meta$', 'fdR$', 'response$') + + /** + * Finalize Downloaded FILE + */ + const finalizeDownload$ = downloadFromMTDFile$.last() + .withLatestFrom(fdR$, meta$, (_, fd, meta) => ({ + FILE, + fd$: O.just(fd), + meta$: O.just(meta) + })) + .flatMap(U.FinalizeDownload) + .share() + .last() + + /** + * Close File Descriptors + */ + const fd$ = finalizeDownload$.withLatestFrom(fdW$, fdR$) + .map(R.tail) + .flatMap(R.map(R.of)) + const closed$ = FILE.close(fd$) + return [mux({response$, meta$, closed$}), {FILE, HTTP, UTILS}] +} diff --git a/src/NewDownload.js b/src/NewDownload.js deleted file mode 100644 index 3fd8a46..0000000 --- a/src/NewDownload.js +++ /dev/null @@ -1,13 +0,0 @@ -import Rx from 'rx' -import VALID_URL from 'valid-url' -import _ from 'lodash' -import {pathGenerator} from './Utils' - -export default (createDownload, pFlags) => pFlags - .filter((x) => VALID_URL.isUri(x.url)) - .flatMap((x) => { - const defaultParams = {path: pathGenerator(x.url)} - _.defaults(x, defaultParams) - const source = createDownload(x) - return Rx.Observable.merge(source.start(), source.stats) - }) diff --git a/src/Request.js b/src/Request.js new file mode 100644 index 0000000..aa94336 --- /dev/null +++ b/src/Request.js @@ -0,0 +1,28 @@ +/** + * Created by tushar.mathur on 18/06/16. + */ + +'use strict' + +import {Observable as O} from 'rx' +import {mux} from 'muxer' +import R from 'ramda' + +export const ev = R.curry(($, event) => $.filter(R.whereEq({event})).pluck('message')) + +export const RequestParams = R.curry((request, params) => { + return O.create((observer) => request(params) + .on('data', (message) => observer.onNext({event: 'data', message})) + .on('response', (message) => observer.onNext({event: 'response', message})) + .on('complete', () => observer.onCompleted()) + .on('error', (error) => observer.onError(error)) + ) +}) + +export const Request = R.curry((request, params) => { + const Response$ = ev(RequestParams(request, params)) + return mux({ + response$: Response$('response'), + data$: Response$('data') + }) +}) diff --git a/src/ResumeDownload.js b/src/ResumeDownload.js deleted file mode 100644 index 6c9d05c..0000000 --- a/src/ResumeDownload.js +++ /dev/null @@ -1,15 +0,0 @@ -import {normalizePath} from './Utils' -import _ from 'lodash' -import Rx from 'rx' - -export default (createDownload, pFlags) => pFlags - .skipWhile((x) => x.url) - .flatMap((x) => { - const defaultParams = { - mtdPath: normalizePath(x.file), - path: normalizePath(x.file.replace('.mtd', '')) - } - _.defaults(x, defaultParams) - const source = createDownload(x) - return Rx.Observable.merge(source.download(), source.stats) - }) diff --git a/src/RxFP.js b/src/RxFP.js new file mode 100644 index 0000000..cc99da7 --- /dev/null +++ b/src/RxFP.js @@ -0,0 +1,23 @@ +/** + * Created by tushar.mathur on 10/06/16. + */ + +'use strict' + +import R from 'ramda' +import {Observable as O} from 'rx' +export const map = R.curry((func, $) => $.map(func)) +export const flatMap = R.curry((func, $) => $.flatMap(func)) +export const withLatestFrom = R.curry((list, $) => $.withLatestFrom(...list)) +export const zip = R.curry((list, $) => $.zip(...list)) +export const zipWith = R.curry((func, list, $) => $.zip(...list, func)) +export const filter = R.curry((func, $) => $.filter(func)) +export const distinctUntilChanged = $ => $.distinctUntilChanged() +export const pluck = R.curry((path, $) => $.pluck(path)) +export const scan = R.curry((func, $) => $.scan(func)) +export const scanWith = R.curry((func, m, $) => $.scan(func, m)) +export const shareReplay = R.curry((count, $) => $.shareReplay(count)) +export const repeat = R.curry((value, count) => O.repeat(value, count)) +export const trace = R.curry((msg, $) => $.tap(x => console.log(msg, x))) +export const tap = R.curry((func, $) => $.tap(func)) +export const share = ($) => $.share() diff --git a/src/Transformers.js b/src/Transformers.js deleted file mode 100644 index e4d6f42..0000000 --- a/src/Transformers.js +++ /dev/null @@ -1,33 +0,0 @@ -import request from 'request' -import Rx from 'rx' -import fs from 'graceful-fs' -import _ from 'lodash' -import * as u from './Utils' - -export const requestBody = (params) => Rx.Observable.create((observer) => request(params) - .on('data', (message) => observer.onNext({event: 'data', message})) - .on('response', (message) => observer.onNext({event: 'response', message})) - .on('complete', (message) => observer.onCompleted({event: 'completed', message})) - .on('error', (error) => observer.onError(error)) -) - -export const requestHead = (params) => requestBody(params) - .first() - .pluck('message') - .tap((x) => x.destroy()) - -export const requestContentLength = (params) => requestHead(params) - .pluck('headers', 'content-length') - .map((x) => parseInt(x, 10)) - -export const fsOpen = Rx.Observable.fromNodeCallback(fs.open) -export const fsWrite = Rx.Observable.fromNodeCallback(fs.write) -export const fsTruncate = Rx.Observable.fromNodeCallback(fs.truncate) -export const fsRename = Rx.Observable.fromNodeCallback(fs.rename) -export const fsStat = Rx.Observable.fromNodeCallback(fs.fstat) -export const fsRead = Rx.Observable.fromNodeCallback(fs.read) -export const fsReadBuffer = (x) => fsRead(x.fd, x.buffer, 0, x.buffer.length, x.offset) -export const fsWriteBuffer = (x) => fsWrite(x.fd, x.buffer, 0, x.buffer.length, x.offset) -export const fsWriteJSON = (x) => fsWriteBuffer(_.assign({}, x, {buffer: u.toBuffer(x.json)})) -export const fsReadJSON = (x) => fsReadBuffer(x).map((x) => JSON.parse(x[1].toString())) -export const buffer = (size) => Rx.Observable.just(u.createEmptyBuffer(size)) diff --git a/src/Utils.js b/src/Utils.js index 4854d1f..72e86bd 100644 --- a/src/Utils.js +++ b/src/Utils.js @@ -4,126 +4,234 @@ 'use strict' -import _ from 'lodash' import PATH from 'path' import URL from 'url' -import Rx from 'rx' -import * as u from './Utils' -import {create} from 'reactive-storage' -import Immutable from 'immutable' +import {Observable as O} from 'rx' +import R from 'ramda' +import * as Rx from './RxFP' +import {mux, demux} from 'muxer' import {MTDError, FILE_SIZE_UNKNOWN} from './Error' -import * as ob from './Transformers' -const PROPS = ['range', 'url', 'totalBytes', 'threads', 'offsets', 'strictSSL'] - -export const initialize = (ob, options) => ob - .requestContentLength(options) - .map((totalBytes) => { - if (!_.isFinite(totalBytes)) throw new MTDError(FILE_SIZE_UNKNOWN) - const threads = u.splitRange(totalBytes, options.range) - return _.assign({}, options, {totalBytes, threads, offsets: threads.map((x) => x[0])}) - }) - .map((x) => _.pick(x, PROPS)) - -export const load = (fileDescriptor) => { - const contentLength = fileDescriptor.flatMap((x) => ob.fsStat(x)).pluck('size').map((x) => x - 512) - return Rx.Observable.combineLatest( - contentLength, - fileDescriptor, - ob.buffer(512), - u.selectAs('offset', 'fd', 'buffer')) - .flatMap(ob.fsReadJSON) +const first = R.nth(0) +const second = R.nth(1) +export const trace = R.curry((msg, value) => { + console.log(msg, value) + return value +}) +export const demuxFP = R.curry((list, $) => demux($, ...list)) +export const demuxFPH = R.curry((list, $) => R.head(demux($, ...list))) +export const BUFFER_SIZE = 512 +export const NormalizePath = (path) => PATH.resolve(process.cwd(), path) +export const GenerateFileName = (x) => { + return R.last(URL.parse(x).pathname.split('/')) || Date.now() } - -export const save = (ob, fileDescriptor, metaJSON) => metaJSON - .combineLatest(fileDescriptor, u.selectAs('json', 'fd')) - .map((x) => _.assign(x, {offset: x.json.totalBytes})) - .flatMap(ob.fsWriteJSON) - .map((x) => JSON.parse(x[1].toString())) - -export const update = (baseMeta, bytesSaved, offsets) => bytesSaved - .withLatestFrom(baseMeta, offsets, u.selectAs('content', 'meta', 'offsets')) - .map((x) => _.assign({}, x.meta, {offsets: x.offsets.toJS()})) - .distinctUntilChanged() - -export const toBuffer = function (obj, size) { - var buffer = createEmptyBuffer(size) - buffer.write(JSON.stringify(obj)) +export const ResolvePath = R.compose(NormalizePath, GenerateFileName) +export const SplitRange = (totalBytes, range) => { + const delta = Math.round(totalBytes / range) + const start = R.times((x) => x * delta, range) + const end = R.times((x) => (x + 1) * delta - 1, range) + end[range - 1] = totalBytes + return R.zip(start, end) +} +export const CreateRangeHeader = ([start, end]) => `bytes=${start}-${end}` +export const SetRangeHeader = ({request, range}) => { + return R.set( + R.lensPath(['headers', 'range']), + CreateRangeHeader(range), + R.omit(['threads', 'offsets'], request) + ) +} +export const CreateRequestParams = ({meta, index}) => { + const range = [meta.offsets[index], second(meta.threads[index])] + return SetRangeHeader({request: meta, range}) +} +export const ToBuffer = R.curry((size, str) => { + var buffer = CreateFilledBuffer(size) + buffer.write(str) + return buffer +}) +export const CreateFilledBuffer = (size = BUFFER_SIZE, fill = ' ') => { + const buffer = new Buffer(size) + buffer.fill(fill) return buffer } +export const MergeDefaultOptions = (options) => R.mergeAll([ + {mtdPath: options.path + '.mtd', range: 3, metaWrite: 300}, + options +]) -export const selectAs = function () { - const keys = _.toArray(arguments) - return function () { - const values = _.toArray(arguments) - const merge = (m, k, i) => (m[k] = values[i]) - return _.transform(keys, merge, {}) - } -} +// TODO: Use R.lens instead +export const GetOffset = R.curry((meta, index) => meta.offsets[index]) +export const GetThread = R.curry((meta, index) => meta.threads[index]) +export const GetThreadStart = R.curryN(2, R.compose(R.nth(0), GetThread)) +export const GetThreadEnd = R.curryN(2, R.compose(R.nth(1), GetThread)) +export const GetThreadCount = R.compose(R.length, R.prop('threads')) +export const TimesCount = R.times(R.identity) -export const log = function () { - return console.log.apply(console, _.toArray(arguments)) +/* + * STREAM BASED + */ +export const GetBufferWriteOffset = ({buffer$, initialOffset}) => { + const accumulator = ([_buffer, _offset], buffer) => [buffer, _buffer.length + _offset] + return buffer$.scan(accumulator, [{length: 0}, initialOffset]) +} +export const SetBufferParams = ({buffer$, index, meta}) => { + const initialOffset = GetOffset(meta, index) + const addParams = R.compose(Rx.map(R.append(index)), GetBufferWriteOffset) + return addParams({buffer$, initialOffset}) } -export const createEmptyBuffer = function (size) { - var buffer = new Buffer(size || 512) - buffer.fill(' ') - return buffer +/** + * Makes an HTTP request using the {HttpRequest} function and appends the + * buffer response with appropriate write position and thread index. + * @function + * @param {Object} HTTP - HTTP transformer + * @param {function} HTTP.request - HTTP request function + * @param {Object} r - a dict of meta and selected thread index + * @param {Object} r.meta - the download meta info + * @param {Object} r.index - index of the selected thread + * @returns {Observable} a muxed {buffer$, response$} + */ +export const RequestThread = R.curry((HTTP, {meta, index}) => { + const pluck = demuxFPH(['data$', 'response$']) + const HttpRequest = R.compose(HTTP.request, CreateRequestParams) + const {response$, data$} = pluck(HttpRequest({meta, index})) + const buffer$ = SetBufferParams({buffer$: data$, meta, index}) + return mux({buffer$, response$}) +}) +export const ToJSON$ = source$ => source$.map(JSON.stringify.bind(JSON)) +export const ToBuffer$ = source$ => source$.map(ToBuffer(BUFFER_SIZE)) +export const JSToBuffer$ = R.compose(ToBuffer$, ToJSON$) +export const BufferToJS$ = buffer$ => { + return buffer$.map(buffer => JSON.parse(buffer.toString())) } -export const normalizePath = (path) => PATH.resolve(process.cwd(), path) -export const fileNameGenerator = (x) => _.last(URL.parse(x).pathname.split('/')) || Date.now() -export const pathGenerator = (x) => normalizePath(fileNameGenerator(x)) -export const rangeHeader = (range) => ({range: `bytes=${range[0]}-${range[1]}`}) -export const bufferOffset = (buffer, offset) => { - if (typeof offset !== 'number') { - offset = 0 - } - return buffer - .map((buffer) => ({buffer, offset})) - .tap((x) => (offset += x.buffer.length)) +export const RemoteFileSize$ = ({HTTP, options}) => { + return HTTP.requestHead(options) + .pluck('headers', 'content-length') + .map((x) => parseInt(x, 10)) } -export const splitRange = (totalBytes, range) => { - const delta = Math.round(totalBytes / range) - const start = _.times(range, (x) => x * delta) - const end = _.times(range, (x) => (x + 1) * delta - 1) - end[range - 1] = totalBytes - return _.zip(start, end) -} -export const createFD = _.curry((ob, path, flag) => ob.fsOpen(path, flag)) -export const bufferSave = (ob, fileDescriptor, content) => { - return content - .combineLatest(fileDescriptor, (content, fd) => _.assign(content, {fd})) - .flatMap((x) => ob.fsWriteBuffer(x).map(x)) -} -export const contentLoad = (ob, metaStream) => metaStream - .flatMap((meta) => meta.threads.map((range, index) => ({range, meta, index}))) - .map((x) => { - const offset = x.meta.offsets[x.index] - const range = rangeHeader([offset, x.range[1]]) - const params = _.omit(x.meta, 'threads', 'offsets') - params.headers = _.assign({}, params.headers, range) - return {params, range: x.range, index: x.index, offset} +export const LocalFileSize$ = ({FILE, fd$}) => { + return FILE.fstat(fd$.map(R.of)).pluck('size') +} +export const PickFirst = R.map(first) +export const CreateMeta$ = ({size$, options}) => { + return size$.map((totalBytes) => { + if (!isFinite(totalBytes)) throw new MTDError(FILE_SIZE_UNKNOWN) + const threads = SplitRange(totalBytes, options.range) + return R.merge(options, {totalBytes, threads, offsets: PickFirst(threads)}) }) - .flatMap((x) => bufferOffset(ob.requestBody(x.params) - .filter((x) => x.event === 'data') - .pluck('message'), x.offset) - .map((i) => _.assign({}, i, {range: x.range, index: x.index})) +} +export const ReadFileAt$ = ({FILE, fd$, position$, size = BUFFER_SIZE}) => { + const readParams$ = O.combineLatest(position$, fd$) + const buffer = CreateFilledBuffer(size) + const toParam = ([position, fd]) => [fd, buffer, 0, buffer.length, position] + return FILE.read(readParams$.map(toParam)) +} +export const MetaPosition$ = ({size$}) => size$.map(R.add(-BUFFER_SIZE)) +export const CreateWriteBufferAtParams = ({fd$, buffer$, position$}) => { + const toParam = ([buffer, fd, position]) => [fd, buffer, 0, buffer.length, position] + return O.combineLatest(buffer$, fd$, position$.first()).map(toParam) +} +export const CreateWriteBufferParams = R.compose( + O.just, + ([fd, buffer, position]) => [fd, buffer, 0, buffer.length, position], + R.unnest +) +export const SetMetaOffsets = ({meta$, bufferWritten$}) => { + const offsetLens = thread => R.compose(R.lensProp('offsets'), R.lensIndex(thread)) + const start$ = meta$.map(meta => ({meta, len: 0, thread: 0})).first() + const source$ = O.merge( + start$, + bufferWritten$.map(x => [x[3], x[2]]) + .map(R.zipObj(['len', 'thread'])) + .withLatestFrom(meta$.map(R.objOf('meta'))) + .map(R.mergeAll) ) -export const defaultOptions = {range: 3} -export const initParams = (options) => _.defaults( - options, - defaultOptions, - {mtdPath: options.path + '.mtd'} -) + const accumulator = (previous, current) => { + const thread = current.thread + const pMeta = previous.meta + const oldVal = pMeta.offsets[thread] + const lens = offsetLens(thread) + const meta = R.set(lens, R.add(oldVal, current.len), pMeta) + return R.merge(current, {meta}) + } + return source$ + .scan(accumulator) + .skip(1) + .pluck('meta') +} +export const ReadJSON$ = R.compose(BufferToJS$, Rx.map(second), ReadFileAt$) +export const IsOffsetInRange = R.curry((meta, i) => { + const start = R.lte(GetThreadStart(meta, i)) + const end = R.gt(GetThreadEnd(meta, i)) + const inRange = R.allPass([start, end]) + return inRange(GetOffset(meta, i)) +}) +export const FlattenMeta$ = Rx.flatMap((meta) => { + const MergeMeta = R.map(R.compose(R.merge({meta}), R.objOf('index'))) + const IsValid = R.filter(IsOffsetInRange(meta)) + return MergeMeta(IsValid(TimesCount(GetThreadCount(meta)))) +}) +export const RxThrottleComplete = (window$, $, sh) => { + const selector = window => O.merge($.throttle(window, sh), $.last()) + return window$.first().flatMap(selector) +} +export const IsCompleted$ = (meta$) => { + const offsetsA = R.prop('offsets') + const offsetsB = R.compose(R.map(second), R.prop('threads')) + const subtract = R.apply(R.subtract) + const diff = R.compose(R.all(R.lte(0)), R.map(subtract), R.zip) + const isComplete = R.converge(diff, [offsetsA, offsetsB]) + return meta$.map(isComplete).distinctUntilChanged() +} -export const downloadMTD = (ob, fd) => { - const offsets = create(Immutable.List([])) - const loadedMETA = load(fd) - .tap((x) => offsets.set((i) => i.merge(x.offsets))) - const loadedContent = contentLoad(ob, loadedMETA) - const savedContent = bufferSave(ob, fd, loadedContent) - .tap((x) => offsets.set((i) => i.set(x.index, x.offset))) - const currentMETA = update(loadedMETA, savedContent, offsets.stream) - return save(ob, fd, currentMETA) +/** + * Removes the appended meta data and the .mtd extension from the file. In case + * there still some data leftover to be downloaded, this step will be ignored. + * @function + * @param {Object} FILE - File transformer + * @param {Observable} fd$ - File descriptor observable + * @param {Observable} meta$ - Download meta information + * @returns {Observable} + */ +export const FinalizeDownload = ({FILE, fd$, meta$}) => { + const [ok$, noop$] = IsCompleted$(meta$).partition(Boolean) + const Truncate = ({FILE, meta$, fd$}) => { + const size$ = meta$.pluck('totalBytes') + return FILE.truncate(O.combineLatest(fd$, size$).take(1)) + } + const Rename = ({FILE, meta$}) => { + const params$ = meta$.map(meta => [meta.mtdPath, meta.path]).take(1) + return FILE.rename(params$) + } + return O.merge( + mux({noop$}), + ok$.flatMap(() => { + const truncated$ = Truncate({FILE, meta$, fd$}) + const renamed$ = truncated$.flatMap(() => Rename({FILE, meta$})) + return mux({truncated$, renamed$}) + }) + ) +} +export const WriteBuffer = ({FILE, fd$, buffer$}) => { + const Write = R.compose(FILE.write, CreateWriteBufferParams) + return O.combineLatest(fd$, buffer$) + .flatMap(params => { + return Write(params).map(R.concat(R.nth(1, params))) + }) } + +/** + * Makes HTTP requests to start downloading data for each thread described in + * the meta data. + * @function + * @param {Object} HTTP - an HTTP transformer + * @param {function} HTTP.request - an HTTP transformer + * @param {Observable} meta$ - meta data as a stream + * @returns {Observable} - muxed stream of responses$ and buffer$ + */ +export const RequestWithMeta = R.uncurryN(2, (HTTP) => R.compose( + Rx.flatMap(RequestThread(HTTP)), + FlattenMeta$ +)) diff --git a/src/bin/mtd.js b/src/bin/mtd.js index b23e6b6..66a59c7 100644 --- a/src/bin/mtd.js +++ b/src/bin/mtd.js @@ -4,17 +4,10 @@ */ 'use strict' - -import Rx from 'rx' -import _ from 'lodash' +import * as U from '../Utils' import meow from 'meow' -import humanize from 'humanize-plus' -import newDownload from '../NewDownload' -import resumeDownload from '../ResumeDownload' -import {createDownload} from '../index' -import ProgressBar from 'progress' - -const flags = meow(` +import R from 'ramda' +const HELP_TEXT = ` Usage mtd @@ -25,39 +18,12 @@ const flags = meow(` Examples mtd --url http://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_1mb.mp4 mtd --file big_buck_bunny_720p_1mb.mp4.mtd - `).flags + ` +const flags = meow(HELP_TEXT).flags -if (!_.some([flags.url, flags.file], (x) => x)) { +if (!R.any(R.identity)([flags.url, flags.file])) { + console.log(HELP_TEXT) process.exit(0) } -const pFlags = Rx.Observable.just(flags) - -// TODO: Add unit tests -const downloads = Rx.Observable.merge( - newDownload(createDownload, pFlags), - resumeDownload(createDownload, pFlags) -).share() - -const progress = downloads - .pluck('message', 'totalBytes') - .filter((x) => x > 0) - .first() - .map((total) => new ProgressBar(':bar :percent', {total, complete: '█', incomplete: '░'})) - .tap((x) => console.log(`SIZE: ${humanize.fileSize(x.total)}`)).share() - -downloads - .filter((x) => x) - .filter((x) => x.event === 'DATA') - .pluck('message') - - .map((x) => _.sum(_.map(x.offsets, (o, i) => o - x.threads[i][0])) / x.totalBytes) - .withLatestFrom(progress, (bytes, progress) => ({bytes, progress})) - .subscribe((x) => x.progress.update(x.bytes)) - -downloads.last() - .withLatestFrom(progress, (a, b) => b) - .subscribe((x) => { - x.update(x.total) - console.log('Download Completed!') - }) +U.CLI(flags) diff --git a/src/index.js b/src/index.js deleted file mode 100644 index 93e5401..0000000 --- a/src/index.js +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Created by tusharmathur on 5/15/15. - */ -'use strict' -import Rx from 'rx' -import _ from 'lodash' -import {createFD, initParams, save, initialize, downloadMTD} from './Utils' -import * as ob from './Transformers' - -const initMTD = (ob, fd, options) => { - const initialMETA = initialize(ob, options) - return save(ob, fd, initialMETA) -} - -export class Download { - constructor (ob, options) { - this.options = initParams(options) - this.ob = ob - this.fd = createFD(ob, this.options.mtdPath) - this.stats = new Rx.BehaviorSubject() - this.toStat = _.curry((event, message) => this.stats.onNext({event, message})) - this.toStat('INIT', this.options) - } - - start () { - return this - .init() - .flatMap(() => this.download()) - } - - init () { - return initMTD(this.ob, this.fd('w'), this.options) - .tap(this.toStat('CREATE')) - } - - download () { - const fd = this.fd('r+') - const options = this.options - const ob = this.ob - return downloadMTD(ob, fd) - .tap(this.toStat('DATA')) - .last() - .flatMap((x) => ob.fsTruncate(options.mtdPath, x.totalBytes)) - .tap(this.toStat('TRUNCATE')) - .flatMap(() => ob.fsRename(options.mtdPath, options.path)) - .tap(this.toStat('RENAME')) - .tapOnCompleted((x) => this.stats.onCompleted()) - } - - stop () { - } -} - -export const createDownload = (options) => new Download(ob, options) diff --git a/test/integration/test.IO.js b/test/integration/test.IO.js new file mode 100644 index 0000000..55edc81 --- /dev/null +++ b/test/integration/test.IO.js @@ -0,0 +1,61 @@ +/** + * Created by tushar.mathur on 21/01/16. + */ + +'use strict' +import request from 'request' +import {server} from '../../perf/server' +import test from 'ava' +import {HTTP} from '../../src/IO' +import {demux} from 'muxer' + +const http = HTTP(request) +let closeHttp +/*eslint-disable */ +test.before(async function () { + closeHttp = await server(3100) +}) + +test.after(async function () { + await closeHttp() +}) + +test('request', async function (t) { + const params = {url: 'http://localhost:3100/files/pug.jpg', method: 'HEAD'} + const [{response$}] = demux(http.request(params), 'response$') + const response = await response$.toPromise() + t.deepEqual(response.headers['content-length'], '317235') +}) + +test('request:https', async function (t) { + const params = {url: 'https://localhost:3101/files/pug.jpg', method: 'HEAD', strictSSL: false} + const [{response$}] = demux(http.request(params), 'response$') + const response = await response$.toPromise() + t.deepEqual(response.headers['content-length'], '317235') +}) + +test('requestHead', async function (t) { + const response = await http.requestHead({url: 'http://localhost:3100/files/pug.jpg'}).toPromise() + /** + * To know if the socket is destroyed or not + * https://nodejs.org/api/net.html#net_socket_remoteaddress + */ + t.is(response.socket.remoteAddress, undefined) + t.true(response.socket.destroyed) + + /** + * Check Headers + */ + const headers = response.headers + t.is(headers['x-powered-by'], 'Express') + t.is(headers['accept-ranges'], 'bytes') + t.is(headers['cache-control'], 'public, max-age=0') + // t.is(headers['last-modified'], 'Fri, 29 Jan 2016 15:59:57 GMT') + // t.is(headers['etag'], 'W/"4d733-1528e1cc848"') + t.is(headers['content-type'], 'image/jpeg') + t.is(headers['content-length'], '317235') + t.is(headers['connection'], 'close') + +}) + +/*eslint-enable */ diff --git a/test/integration/test.mtd.js b/test/integration/test.mtd.js index 307a62a..dec5f0b 100644 --- a/test/integration/test.mtd.js +++ b/test/integration/test.mtd.js @@ -5,8 +5,8 @@ 'use strict' import Path from 'path' import test from 'ava' -import {removeFile, createFileDigest} from '../../perf/utils' -import {createDownload} from '../../src/index' +import {removeFile, createFileDigest} from '../../perf/TestHelpers' +import {createDownload} from '../../src/Main' import {server} from '../../perf/server' const pathFactory = () => { @@ -35,32 +35,26 @@ test.after(async function () { }) test('http', async function (t) { - const d = createDownload({ - url: 'http://localhost:3200/files/pug.jpg', - path: path1 - }) - await d.start().toPromise() + await createDownload({url: 'http://localhost:3200/files/pug.jpg', path: path1})[0].toPromise() const digest = await createFileDigest(path1) t.deepEqual(digest, '25FD4542D7FFFB3AEC9EF0D25A533DDE4803B9C1') }) test('https', async function (t) { - const d = createDownload({ + await createDownload({ url: 'https://localhost:3201/files/pug.jpg', path: path2, strictSSL: false - }) - await d.start().toPromise() + })[0].toPromise() const digest = await createFileDigest(path2) t.deepEqual(digest, '25FD4542D7FFFB3AEC9EF0D25A533DDE4803B9C1') }) test('http(2)', async function (t) { - const d = createDownload({ + await createDownload({ url: 'http://localhost:3200/files/in.txt', path: path3 - }) - await d.start().toPromise() + })[0].toPromise() const digest = await createFileDigest(path3) t.deepEqual(digest, 'A9070D71168B5135910A04F0650A91541B72762E') }) diff --git a/test/integration/test.observable.js b/test/integration/test.observable.js deleted file mode 100644 index b467279..0000000 --- a/test/integration/test.observable.js +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Created by tushar.mathur on 21/01/16. - */ - -'use strict' -import {server} from '../../perf/server' -import test from 'ava' -import * as Observables from '../../src/Transformers' - -var closeHttp -/*eslint-disable */ -test.before(async function () { - closeHttp = await server(3100) -}) - -test.after(async function () { - await closeHttp() -}) - -test('requestBody', async function (t) { - const response = await Observables - .requestBody({url: 'http://localhost:3100/files/pug.jpg'}) - .filter((x) => x.event === 'response').pluck('message') - .toPromise() - t.deepEqual(response.headers['content-length'], '317235') -}) - -test('requestBody:https', async function (t) { - const response = await Observables - .requestBody({url: 'https://localhost:3101/files/pug.jpg', method: 'HEAD', strictSSL: false}) - .filter((x) => x.event === 'response').pluck('message') - .toPromise() - t.deepEqual(response.headers['content-length'], '317235') -}) - -test('requestContentLength', async function (t) { - const size = await Observables - .requestContentLength({url: 'https://localhost:3101/fixed-size', strictSSL: false}) - .toPromise() - t.deepEqual(size, 41) -}) - -test('requestHead', async function (t) { - const response = await Observables.requestHead({url: 'http://localhost:3100/files/pug.jpg'}).toPromise() - /** - * To know if the socket is destroyed or not - * https://nodejs.org/api/net.html#net_socket_remoteaddress - */ - t.is(response.socket.remoteAddress, undefined) -}) - -/*eslint-enable */ diff --git a/test/test.CreateMTDFile.js b/test/test.CreateMTDFile.js new file mode 100644 index 0000000..e9223cd --- /dev/null +++ b/test/test.CreateMTDFile.js @@ -0,0 +1,83 @@ +/** + * Created by tushar.mathur on 25/06/16. + */ + +'use strict' + +import {ReactiveTest, TestScheduler} from 'rx' +import test from 'ava' +import {CreateMTDFile} from '../src/CreateMTDFile' +import {demux} from 'muxer' + +/** + * Helpers + */ +const {onNext, onCompleted} = ReactiveTest +const Hot = (sh, ...args) => () => sh.createHotObservable(...args) +const pluck = (key, $) => demux($, key)[0][key] +const MockFILE = (sh) => { + return { + open: Hot(sh, onNext(210, 19), onCompleted(210)), + write: Hot(sh, onNext(230, [1000, 'BUFFER-WRITTEN']), onCompleted(230)) + } +} +const MockHTTP = (sh) => { + return { + requestHead: Hot(sh, + onNext(220, {headers: {'content-length': '9000'}}), + onCompleted(220)) + } +} +const createParams = (sh, options) => ({ + FILE: MockFILE(sh), + HTTP: MockHTTP(sh), + options +}) + +test('meta$', t => { + const sh = new TestScheduler() + const options = {url: '/a/b/c', range: 3} + const params = createParams(sh, options) + const {messages} = sh.startScheduler(() => pluck('meta$', CreateMTDFile(params))) + t.deepEqual(messages, [ + onNext(220, { + url: '/a/b/c', range: 3, totalBytes: 9000, + offsets: [0, 3000, 6000], + threads: [[0, 2999], [3000, 5999], [6000, 9000]] + }), + onCompleted(230) + ]) +}) + +test('written$', t => { + const sh = new TestScheduler() + const options = {url: '/a/b/c', range: 3} + const params = createParams(sh, options) + const {messages} = sh.startScheduler(() => pluck('written$', CreateMTDFile(params))) + t.deepEqual(messages, [ + onNext(230, [1000, 'BUFFER-WRITTEN']), + onCompleted(230) + ]) +}) + +test('remoteFileSize$', t => { + const sh = new TestScheduler() + const options = {url: '/a/b/c', range: 3} + const params = createParams(sh, options) + const {messages} = sh.startScheduler(() => pluck('remoteFileSize$', CreateMTDFile(params))) + t.deepEqual(messages, [ + onNext(220, 9000), + onCompleted(230) + ]) +}) + +test('fdW$', t => { + const sh = new TestScheduler() + const options = {url: '/a/b/c', range: 3} + const params = createParams(sh, options) + const {messages} = sh.startScheduler(() => pluck('fdW$', CreateMTDFile(params))) + t.deepEqual(messages, [ + onNext(210, 19), + onCompleted(230) + ]) +}) diff --git a/test/test.initMeta.js b/test/test.CreateMeta.js similarity index 61% rename from test/test.initMeta.js rename to test/test.CreateMeta.js index ed10c65..62421c7 100644 --- a/test/test.initMeta.js +++ b/test/test.CreateMeta.js @@ -1,8 +1,8 @@ -import {initialize} from '../src/Utils' +import {CreateMeta$} from '../src/Utils' import * as err from '../src/Error' import test from 'ava' import {TestScheduler, ReactiveTest} from 'rx' -import {createTestObserver} from '../perf/utils' +import {createTestObserver} from '../perf/TestHelpers' const {onNext, onCompleted} = ReactiveTest test((t) => { @@ -13,10 +13,8 @@ test((t) => { b: 2 } const sh = new TestScheduler() - const ob = { - requestContentLength: () => sh.createHotObservable(onNext(220, 8000), onCompleted()) - } - const out = createTestObserver(initialize(ob, options)) + const size$ = sh.createHotObservable(onNext(220, 8000), onCompleted()) + const out = createTestObserver(CreateMeta$({size$, options})) sh.start() t.deepEqual(out, [ { @@ -24,7 +22,9 @@ test((t) => { url: 'sample-url', totalBytes: 8000, threads: [[0, 3999], [4000, 8000]], - offsets: [0, 4000] + offsets: [0, 4000], + a: 1, + b: 2 } ]) }) @@ -34,10 +34,8 @@ test('invalid size', (t) => { range: 2, url: 'sample-url', a: 1, b: 2 } const sh = new TestScheduler() - const ob = { - requestContentLength: () => sh.createHotObservable(onNext(220, 'AAA'), onCompleted()) - } - createTestObserver(initialize(ob, options)) + const size$ = sh.createHotObservable(onNext(220, 'AAA'), onCompleted()) + createTestObserver(CreateMeta$({size$, options})) try { sh.start() } catch (e) { diff --git a/test/test.CreateRequestParams.js b/test/test.CreateRequestParams.js new file mode 100644 index 0000000..f84384c --- /dev/null +++ b/test/test.CreateRequestParams.js @@ -0,0 +1,23 @@ +/** + * Created by tushar.mathur on 09/06/16. + */ + +'use strict' + +import test from 'ava' +import {CreateRequestParams} from '../src/Utils' + +test((t) => { + const meta = { + url: '/abc', + offsets: [10, 20, 30], + threads: [[0, 15], [16, 25], [26, 35]], + headers: {a: '1'} + } + const index = 1 + const range = [10, 100] + t.deepEqual( + CreateRequestParams({meta, index, range}), + {url: '/abc', headers: {a: '1', range: 'bytes=20-25'}} + ) +}) diff --git a/test/test.CreateWriteBufferAtParams.js b/test/test.CreateWriteBufferAtParams.js new file mode 100644 index 0000000..f3f33c4 --- /dev/null +++ b/test/test.CreateWriteBufferAtParams.js @@ -0,0 +1,37 @@ +/** + * Created by tushar.mathur on 25/01/16. + */ + +'use strict' + +import {CreateWriteBufferAtParams} from '../src/Utils' +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {spy} from 'sinon' + +const {onNext, onCompleted} = ReactiveTest + +test((t) => { + const fsWrite = spy() + const FILE = {write: x => x.map(fsWrite)} + const sh = new TestScheduler() + const fd$ = sh.createHotObservable(onNext(201, 20), onCompleted(250)) + const buffer$ = sh.createHotObservable( + onNext(210, 'MOCK-BUFFER-10'), + onNext(220, 'MOCK-BUFFER-20'), + onNext(230, 'MOCK-BUFFER-30'), + onCompleted(250) + ) + const position$ = sh.createHotObservable( + onNext(202, 1024), + onNext(222, 2048), + onCompleted(250) + ) + const {messages} = sh.startScheduler(() => CreateWriteBufferAtParams({FILE, fd$, buffer$, position$})) + t.deepEqual(messages, [ + onNext(210, [20, 'MOCK-BUFFER-10', 0, 14, 1024]), + onNext(220, [20, 'MOCK-BUFFER-20', 0, 14, 1024]), + onNext(230, [20, 'MOCK-BUFFER-30', 0, 14, 1024]), + onCompleted(250) + ]) +}) diff --git a/test/test.CreateWriteBufferParams.js b/test/test.CreateWriteBufferParams.js new file mode 100644 index 0000000..b2ac14e --- /dev/null +++ b/test/test.CreateWriteBufferParams.js @@ -0,0 +1,21 @@ +/** + * Created by tushar.mathur on 26/01/16. + */ + +'use strict' + +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {CreateWriteBufferParams} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test((t) => { + const sh = new TestScheduler() + const fd = 19 + const buffer = ['BUFFER', 1024, 1] + const {messages} = sh.startScheduler(() => CreateWriteBufferParams([fd, buffer])) + t.deepEqual(messages, [ + onNext(200, [19, 'BUFFER', 0, 6, 1024]), + onCompleted(200) + ]) +}) diff --git a/test/test.DownloadFromMTDFile.js b/test/test.DownloadFromMTDFile.js new file mode 100644 index 0000000..d4469cf --- /dev/null +++ b/test/test.DownloadFromMTDFile.js @@ -0,0 +1,95 @@ +/** + * Created by tushar.mathur on 25/06/16. + */ + +'use strict' +import {ReactiveTest, TestScheduler} from 'rx' +import test from 'ava' +import {DownloadFromMTDFile} from '../src/DownloadFromMTDFile' +import {demux} from 'muxer' + +/** + * Helpers + */ +const {onNext, onCompleted} = ReactiveTest +const Hot = (sh, ...args) => () => sh.createHotObservable(...args) +const pluck = (key, $) => demux($, key)[0][key] +const MockMETA = { + threads: [], + offsets: [], + url: '/a/b/c' +} +const MockFILE = (sh) => { + return { + open: Hot(sh, onNext(210, 19), onCompleted(210)), + fstat: Hot(sh, onNext(220, {size: 9000}), onCompleted(220)), + read: Hot(sh, + onNext(230, [25, {toString: () => JSON.stringify(MockMETA)}]), + onCompleted(230) + ), + write: Hot(sh, + onNext(240, 'WRITE-0'), + onNext(250, 'WRITE-1'), + onNext(260, 'WRITE-2'), + onCompleted(260) + ) + } +} +const MockHTTP = (sh) => { + return {} +} +const createParams = (sh, mtdPath) => ({ + FILE: MockFILE(sh), + HTTP: MockHTTP(sh), + mtdPath +}) + +test('localFileSize$', t => { + const sh = new TestScheduler() + const params = createParams(sh, './home/file.mtd') + const {messages} = sh.startScheduler( + () => pluck('localFileSize$', DownloadFromMTDFile(params)) + ) + t.deepEqual(messages, [ + onNext(220, 9000), + onCompleted(260) + ]) +}) + +test('fdR$', t => { + const sh = new TestScheduler() + const params = createParams(sh, './home/file.mtd') + const {messages} = sh.startScheduler( + () => pluck('fdR$', DownloadFromMTDFile(params)) + ) + t.deepEqual(messages, [ + onNext(210, 19), + onCompleted(260) + ]) +}) + +test('metaWritten$', t => { + const sh = new TestScheduler() + const params = createParams(sh, './home/file.mtd') + const {messages} = sh.startScheduler( + () => pluck('metaWritten$', DownloadFromMTDFile(params)) + ) + t.deepEqual(messages, [ + onNext(240, 'WRITE-0'), + onNext(250, 'WRITE-1'), + onNext(260, 'WRITE-2'), + onCompleted(260) + ]) +}) + +test('metaPosition$', t => { + const sh = new TestScheduler() + const params = createParams(sh, './home/file.mtd') + const {messages} = sh.startScheduler( + () => pluck('metaPosition$', DownloadFromMTDFile(params)) + ) + t.deepEqual(messages, [ + onNext(220, (9000 - 512)), + onCompleted(260) + ]) +}) diff --git a/test/test.FinalizeDownload.js b/test/test.FinalizeDownload.js new file mode 100644 index 0000000..864b8d0 --- /dev/null +++ b/test/test.FinalizeDownload.js @@ -0,0 +1,73 @@ +/** + * Created by tushar.mathur on 27/06/16. + */ + +'use strict' +import {ReactiveTest, TestScheduler} from 'rx' +import test from 'ava' +import {FinalizeDownload} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test('complete', t => { + const sh = new TestScheduler() + const fd$ = sh.createHotObservable(onNext(210, 100), onCompleted(210)) + const meta$ = sh.createHotObservable( + onNext(220, { + threads: [[0, 10], [11, 20]], + offsets: [10, 20] + }), + onCompleted(220) + ) + const truncate$ = sh.createHotObservable( + onNext(300, 'TRUNCATED'), + onCompleted(300) + ) + + const rename$ = sh.createHotObservable( + onNext(400, 'RENAMED'), + onCompleted(400) + ) + const FILE = { + truncate: () => truncate$, + rename: () => rename$ + } + const {messages} = sh.startScheduler( + () => FinalizeDownload({FILE, fd$, meta$}) + ) + t.deepEqual(messages, [ + onNext(300, ['truncated$', 'TRUNCATED']), + onNext(400, ['renamed$', 'RENAMED']), + onCompleted(400) + ]) +}) + +test('incomplete', t => { + const sh = new TestScheduler() + const fd$ = sh.createHotObservable(onNext(210, 100), onCompleted(210)) + const meta$ = sh.createHotObservable( + onNext(220, { + threads: [[0, 10], [11, 20]], + offsets: [5, 20] + }), + onCompleted(220) + ) + const truncate$ = sh.createHotObservable( + onNext(300, 'TRUNCATED'), + onCompleted(300) + ) + const rename$ = sh.createHotObservable( + onNext(400, 'RENAMED'), + onCompleted(400) + ) + const FILE = { + truncate: () => truncate$, + rename: () => rename$ + } + const {messages} = sh.startScheduler( + () => FinalizeDownload({FILE, fd$, meta$}) + ) + t.deepEqual(messages, [ + onNext(220, ['noop$', false]), + onCompleted(220) + ]) +}) diff --git a/test/test.FlattenMeta$.js b/test/test.FlattenMeta$.js new file mode 100644 index 0000000..756c131 --- /dev/null +++ b/test/test.FlattenMeta$.js @@ -0,0 +1,29 @@ +/** + * Created by tushar.mathur on 18/06/16. + */ + +'use strict' + +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {FlattenMeta$} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test((t) => { + const sh = new TestScheduler() + const meta = { + offsets: [10, 21, 23, 35, 41], + threads: [[0, 10], [11, 20], [21, 30], [31, 40], [41, 50]] + } + const meta$ = sh.createHotObservable( + onNext(210, meta), + onCompleted(220) + ) + const {messages} = sh.startScheduler(() => FlattenMeta$(meta$)) + t.deepEqual(messages, [ + onNext(210, {meta, index: 2}), + onNext(210, {meta, index: 3}), + onNext(210, {meta, index: 4}), + onCompleted(220) + ]) +}) diff --git a/test/test.GetBufferWriteOffset.js b/test/test.GetBufferWriteOffset.js new file mode 100644 index 0000000..946b0cf --- /dev/null +++ b/test/test.GetBufferWriteOffset.js @@ -0,0 +1,30 @@ +/** + * Created by tushar.mathur on 24/06/16. + */ + +'use strict' + +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {GetBufferWriteOffset} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test(t => { + const sh = new TestScheduler() + const buffer$ = sh.createHotObservable( + onNext(220, 'BUFFER'), + onNext(230, 'BUFFER1'), + onNext(240, 'BUFFER22'), + onNext(250, 'BUFFER333'), + onCompleted(260) + ) + + const {messages} = sh.startScheduler(() => GetBufferWriteOffset({buffer$, initialOffset: 1000})) + t.deepEqual(messages, [ + onNext(220, ['BUFFER', 1000]), + onNext(230, ['BUFFER1', 1006]), + onNext(240, ['BUFFER22', 1013]), + onNext(250, ['BUFFER333', 1021]), + onCompleted(260) + ]) +}) diff --git a/test/test.IsCompleted.js b/test/test.IsCompleted.js new file mode 100644 index 0000000..1fff697 --- /dev/null +++ b/test/test.IsCompleted.js @@ -0,0 +1,41 @@ +/** + * Created by tushar.mathur on 23/06/16. + */ + +'use strict' + +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {IsCompleted$} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test('eq', t => { + const sh = new TestScheduler() + const threads = [[0, 10], [11, 20], [21, 30]] + const meta$ = sh.createHotObservable( + onNext(210, {threads, offsets: [5, 15, 25]}), + onNext(220, {threads, offsets: [5, 20, 25]}), + onNext(230, {threads, offsets: [10, 20, 30]}), + onCompleted(250) + ) + const {messages} = sh.startScheduler(() => IsCompleted$(meta$)) + t.deepEqual(messages, [ + onNext(210, false), + onNext(230, true), + onCompleted(250) + ]) +}) + +test('gt', t => { + const sh = new TestScheduler() + const threads = [[0, 10], [11, 20], [21, 30]] + const meta$ = sh.createHotObservable( + onNext(210, {threads, offsets: [11, 21, 31]}), + onCompleted(220) + ) + const {messages} = sh.startScheduler(() => IsCompleted$(meta$)) + t.deepEqual(messages, [ + onNext(210, true), + onCompleted(220) + ]) +}) diff --git a/test/test.JSToBuffer$.js b/test/test.JSToBuffer$.js new file mode 100644 index 0000000..59e076d --- /dev/null +++ b/test/test.JSToBuffer$.js @@ -0,0 +1,21 @@ +/** + * Created by tushar.mathur on 09/06/16. + */ + +'use strict' + +import test from 'ava' +import {TestScheduler, ReactiveTest} from 'rx' +import {JSToBuffer$, BUFFER_SIZE} from '../src/Utils' +const {onNext} = ReactiveTest + +test(t => { + let value + const sh = new TestScheduler() + const js = {a: 1, b: 2} + const js$ = sh.createHotObservable(onNext(210, js)) + JSToBuffer$(js$).subscribe(x => (value = x)) + sh.start() + t.is(value.length, BUFFER_SIZE) + t.is(value.toString().trim(), JSON.stringify(js)) +}) diff --git a/test/test.MergeDefaultOptions.js b/test/test.MergeDefaultOptions.js new file mode 100644 index 0000000..c6b7ba3 --- /dev/null +++ b/test/test.MergeDefaultOptions.js @@ -0,0 +1,14 @@ +import {MergeDefaultOptions} from '../src/Utils' +import test from 'ava' + +test((t) => { + const out = MergeDefaultOptions({range: 10, path: 'download.dmg', url: 'sample'}) + t.deepEqual(out, {range: 10, mtdPath: 'download.dmg.mtd', path: 'download.dmg', url: 'sample', metaWrite: 300}) +}) + +test('default:range', (t) => { + const out = MergeDefaultOptions({path: 'download.dmg', url: 'sample'}) + t.deepEqual(out, { + range: 3, mtdPath: 'download.dmg.mtd', path: 'download.dmg', url: 'sample', metaWrite: 300 + }) +}) diff --git a/test/test.ReadFileAt.js b/test/test.ReadFileAt.js new file mode 100644 index 0000000..ce56bdf --- /dev/null +++ b/test/test.ReadFileAt.js @@ -0,0 +1,21 @@ +/** + * Created by tushar.mathur on 09/06/16. + */ + +'use strict' + +import {ReadFileAt$, CreateFilledBuffer} from '../src/Utils' +import test from 'ava' +import {spy} from 'sinon' +import {TestScheduler, ReactiveTest} from 'rx' +const {onNext, onCompleted} = ReactiveTest + +test(t => { + const sh = new TestScheduler() + const fd$ = sh.createHotObservable(onNext(210, 10), onCompleted(220)) + const position$ = sh.createHotObservable(onNext(220, 1024), onCompleted(230)) + const fsRead = spy() + const FILE = {read: x => x.map(fsRead)} + sh.startScheduler(() => ReadFileAt$({FILE, fd$, position$, size: 12})) + t.true(fsRead.calledWith([10, CreateFilledBuffer(12), 0, 12, 1024])) +}) diff --git a/test/test.RequestThread.js b/test/test.RequestThread.js new file mode 100644 index 0000000..02f4427 --- /dev/null +++ b/test/test.RequestThread.js @@ -0,0 +1,77 @@ +/** + * Created by tushar.mathur on 10/06/16. + */ + +'use strict' +import test from 'ava' +import {TestScheduler, ReactiveTest, Observable as O} from 'rx' +import {spy} from 'sinon' +import {mux} from 'muxer' +import {RequestThread} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test('response$', (t) => { + const sh = new TestScheduler() + const data$ = sh.createHotObservable( + onNext(220, 'BUFFER'), + onNext(230, 'BUFFER1'), + onNext(240, 'BUFFER22'), + onNext(250, 'BUFFER333'), + onCompleted(250) + ) + const response$ = sh.createHotObservable(onNext(210, 'RESPONSE'), onCompleted(210)) + const HTTP = {request: () => mux({data$, response$})} + const meta = { + threads: [[0, 100], [101, 200], [201, 300]], + offsets: [50, 150, 250] + } + const index = 1 + const {messages} = sh.startScheduler( + () => RequestThread(HTTP, {meta, index}) + ) + t.deepEqual(messages, [ + onNext(210, ['response$', 'RESPONSE']), + onNext(220, ['buffer$', ['BUFFER', 150, 1]]), + onNext(230, ['buffer$', ['BUFFER1', 156, 1]]), + onNext(240, ['buffer$', ['BUFFER22', 163, 1]]), + onNext(250, ['buffer$', ['BUFFER333', 171, 1]]), + onCompleted(250) + ]) +}) + +test('request', (t) => { + const sh = new TestScheduler() + const data$ = sh.createHotObservable( + onNext(220, 'BUFFER'), + onCompleted(250) + ) + const response$ = sh.createHotObservable(onNext(210, 'RESPONSE'), onCompleted(210)) + const HTTP = {request: spy(() => mux({data$, response$}))} + const meta = { + url: '/a/b/c', + threads: [[0, 100], [101, 200], [201, 300]], + offsets: [50, 150, 250] + } + const index = 1 + sh.startScheduler(() => RequestThread(HTTP, {meta, index})) + t.true(HTTP.request.calledWith({ + url: '/a/b/c', + headers: {range: 'bytes=150-200'} + })) +}) + +test('curried', (t) => { + const sh = new TestScheduler() + const data$ = O.never() + const response$ = O.never() + const HTTP = {request: () => mux({data$, response$})} + const meta = { + threads: [[0, 100], [101, 200], [201, 300]], + offsets: [50, 150, 250] + } + const index = 1 + /** + * CURRY CALL + */ + sh.startScheduler(() => RequestThread(HTTP)({meta, index})) +}) diff --git a/test/test.RequestWithMeta.js b/test/test.RequestWithMeta.js new file mode 100644 index 0000000..fea8f35 --- /dev/null +++ b/test/test.RequestWithMeta.js @@ -0,0 +1,41 @@ +/** + * Created by tushar.mathur on 26/06/16. + */ + +'use strict' + +import {ReactiveTest, TestScheduler} from 'rx' +import test from 'ava' +import {mux} from 'muxer' +import {RequestWithMeta} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test(t => { + const sh = new TestScheduler() + const response$ = sh.createHotObservable( + onNext(300, 'RESPONSE'), + onCompleted(300) + ) + const data$ = sh.createHotObservable( + onNext(300, 'BUFFER'), + onNext(310, 'BUFFER'), + onCompleted(310) + ) + const HTTP = {request: () => mux({response$, data$})} + const meta = { + url: '/a/b/c', + threads: [[0, 10]], + offsets: [5] + } + const meta$ = sh.createHotObservable( + onNext(210, meta), + onCompleted(210) + ) + const {messages} = sh.startScheduler(() => RequestWithMeta(HTTP, meta$)) + t.deepEqual(messages, [ + onNext(300, ['response$', 'RESPONSE']), + onNext(300, ['buffer$', ['BUFFER', 5, 0]]), + onNext(310, ['buffer$', ['BUFFER', 11, 0]]), + onCompleted(310) + ]) +}) diff --git a/test/test.RxThrottleComplete.js b/test/test.RxThrottleComplete.js new file mode 100644 index 0000000..e0c82e6 --- /dev/null +++ b/test/test.RxThrottleComplete.js @@ -0,0 +1,29 @@ +/** + * Created by tushar.mathur on 21/06/16. + */ + +'use strict' + +import {RxThrottleComplete} from '../src/Utils' +import test from 'ava' +import {ReactiveTest, TestScheduler} from 'rx' +const {onNext, onCompleted} = ReactiveTest +test(t => { + const sh = new TestScheduler() + const $ = sh.createHotObservable( + onNext(210, 0), + onNext(220, 1), + onNext(230, 2), + onNext(240, 3), + onNext(250, 4), + onCompleted(260) + ) + const window$ = sh.createColdObservable(onNext(0, 30)) + const {messages} = sh.startScheduler(() => RxThrottleComplete(window$, $, sh)) + t.deepEqual(messages, [ + onNext(210, 0), + onNext(240, 3), + onNext(260, 4), + onCompleted(260) + ]) +}) diff --git a/test/test.SetMetaOffsets.js b/test/test.SetMetaOffsets.js new file mode 100644 index 0000000..486dd7c --- /dev/null +++ b/test/test.SetMetaOffsets.js @@ -0,0 +1,33 @@ +/** + * Created by tushar.mathur on 18/06/16. + */ + +'use strict' + +import {SetMetaOffsets} from '../src/Utils' +import test from 'ava' +import {ReactiveTest, TestScheduler} from 'rx' +const {onNext, onCompleted} = ReactiveTest + +test(t => { + const sh = new TestScheduler() + const bufferWritten$ = sh.createHotObservable( + onNext(310, ['BUFFER', 0, 0, 4, 'WRITTEN']), + onNext(320, ['BUFFER', 10, 1, 4, 'WRITTEN']), + onNext(330, ['BUFFER', 20, 2, 4, 'WRITTEN']), + onCompleted(330) + ) + const meta$ = sh.createHotObservable( + onNext(205, {offsets: [0, 10, 20], restParams: '#'}), + onCompleted(205) + ) + const {messages} = sh.startScheduler( + () => SetMetaOffsets({bufferWritten$, meta$}) + ) + t.deepEqual(messages, [ + onNext(310, {offsets: [4, 10, 20], restParams: '#'}), + onNext(320, {offsets: [4, 14, 20], restParams: '#'}), + onNext(330, {offsets: [4, 14, 24], restParams: '#'}), + onCompleted(330) + ]) +}) diff --git a/test/test.SplitRange.js b/test/test.SplitRange.js new file mode 100644 index 0000000..bfd4b4d --- /dev/null +++ b/test/test.SplitRange.js @@ -0,0 +1,12 @@ +/** + * Created by tushar.mathur on 26/01/16. + */ + +'use strict' +import test from 'ava' +import {SplitRange} from '../src/Utils' + +test((t) => { + t.deepEqual(SplitRange(100, 2), [[0, 49], [50, 100]]) + t.deepEqual(SplitRange(100, 3), [[0, 32], [33, 65], [66, 100]]) +}) diff --git a/test/test.WriteBuffer.js b/test/test.WriteBuffer.js new file mode 100644 index 0000000..4c90db9 --- /dev/null +++ b/test/test.WriteBuffer.js @@ -0,0 +1,28 @@ +/** + * Created by tushar.mathur on 27/06/16. + */ + +'use strict' + +import {ReactiveTest, TestScheduler} from 'rx' +import test from 'ava' +import {WriteBuffer} from '../src/Utils' +const {onNext, onCompleted} = ReactiveTest + +test(t => { + const sh = new TestScheduler() + const written$ = sh.createColdObservable(onNext(30, [777, 'WRITTEN']), onCompleted(30)) + const buffer$ = sh.createHotObservable( + onNext(210, ['BUFF', 100, 10]), + onCompleted(210) + ) + const fd$ = sh.createHotObservable( + onNext(220, 19), onCompleted(220) + ) + const FILE = {write: () => written$} + const {messages} = sh.startScheduler(() => WriteBuffer({FILE, buffer$, fd$})) + t.deepEqual(messages, [ + onNext(250, ['BUFF', 100, 10, 777, 'WRITTEN']), + onCompleted(250) + ]) +}) diff --git a/test/test.bufferSave.js b/test/test.bufferSave.js deleted file mode 100644 index a3a06d2..0000000 --- a/test/test.bufferSave.js +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Created by tushar.mathur on 26/01/16. - */ - -'use strict' - -import test from 'ava' -import {TestScheduler, ReactiveTest} from 'rx' -import {bufferSave} from '../src/Utils' -import {createTestObserver} from '../perf/utils' -const {onNext, onCompleted} = ReactiveTest - -test((t) => { - const sh = new TestScheduler() - const ob = { - fsWriteBuffer: () => sh.createHotObservable(onNext(300, 'hello'), onCompleted(310)) - } - const fd = sh.createHotObservable(onNext(210, 1000), onCompleted(220)) - const content = sh.createHotObservable( - onNext(210, {offset: 100, index: 1}), - onNext(220, {offset: 101, index: 2}), - onNext(230, {offset: 102, index: 1}), - onNext(240, {offset: 103, index: 1}), - onCompleted(250) - ) - const out = createTestObserver(bufferSave(ob, fd, content)) - sh.start() - t.deepEqual(out, [ - {offset: 100, fd: 1000, index: 1}, - {offset: 101, fd: 1000, index: 2}, - {offset: 102, fd: 1000, index: 1}, - {offset: 103, fd: 1000, index: 1} - ]) -}) diff --git a/test/test.contentLoad.js b/test/test.contentLoad.js deleted file mode 100644 index 8bd3424..0000000 --- a/test/test.contentLoad.js +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Created by tushar.mathur on 26/01/16. - */ - -'use strict' - -import test from 'ava' -import { TestScheduler, ReactiveTest } from 'rx' -const {onNext, onCompleted} = ReactiveTest -import {contentLoad} from '../src/Utils' -import { createTestObserver } from '../perf/utils' - -const noop = function () {} -test('request', (t) => { - const requests = [] - const scheduler = new TestScheduler() - const requestBody = (x) => { - requests.push(x) - return scheduler.createHotObservable( - onNext(210, {event: 'data', message: '0-AAA'}), - onCompleted(230) - ) - } - const ob = {requestBody} - const threads = [ - [0, 10], - [11, 20], - [21, 30] - ] - const offsets = [0, 11, 21] - const meta = scheduler.createHotObservable(onNext(210, {offsets, threads, url: 'sample-url'}), onCompleted(220)) - contentLoad(ob, meta).subscribe(noop) - scheduler.start() - t.deepEqual(requests, [ - {headers: {range: 'bytes=0-10'}, url: 'sample-url'}, - {headers: {range: 'bytes=11-20'}, url: 'sample-url'}, - {headers: {range: 'bytes=21-30'}, url: 'sample-url'} - ]) -}) - -test('response', (t) => { - const scheduler = new TestScheduler() - const responseBody = { - 'bytes=0-10': scheduler.createHotObservable( - onNext(210, {event: 'data', message: '0000'}), - onNext(220, {event: 'data', message: '00000000'}), - onCompleted(230) - ), - 'bytes=11-20': scheduler.createHotObservable( - onNext(215, {event: 'data', message: '111'}), - onNext(230, {event: 'data', message: '111111'}), - onCompleted(245) - ) - } - const requestBody = (x) => responseBody[x.headers.range] - const ob = {requestBody} - const threads = [ - [0, 10], - [11, 20] - ] - const offsets = [0, 11] - const meta = scheduler.createHotObservable(onNext(200, {offsets, threads, url: 'sample-url'}), onCompleted(250)) - const responses = createTestObserver(contentLoad(ob, meta)) - scheduler.start() - t.deepEqual(responses, [ - {buffer: '0000', offset: 0, range: [0, 10], index: 0}, - {buffer: '111', offset: 11, range: [11, 20], index: 1}, - {buffer: '00000000', offset: 4, range: [0, 10], index: 0}, - {buffer: '111111', offset: 14, range: [11, 20], index: 1} - ]) -}) - -test('offset', (t) => { - const scheduler = new TestScheduler() - const requestBody = (x) => scheduler.createHotObservable( - onNext(210, {event: 'data', message: '0-AAA'}), - onCompleted(230)) - const ob = {requestBody} - const threads = [ - [0, 10], - [11, 20], - [21, 30] - ] - const offsets = [2, 13, 23] - const meta = scheduler.createHotObservable(onNext(210, {offsets, threads, url: 'sample-url'}), onCompleted(220)) - const out = createTestObserver(contentLoad(ob, meta)) - scheduler.start() - t.deepEqual(out, [ - { buffer: '0-AAA', offset: 2, range: [ 0, 10 ], index: 0 }, - { buffer: '0-AAA', offset: 13, range: [ 11, 20 ], index: 1 }, - { buffer: '0-AAA', offset: 23, range: [ 21, 30 ], index: 2 } - ]) -}) diff --git a/test/test.createFD.js b/test/test.createFD.js deleted file mode 100644 index 672d362..0000000 --- a/test/test.createFD.js +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Created by tushar.mathur on 25/01/16. - */ - -'use strict' - -import {createFD} from '../src/Utils' -import test from 'ava' -import { TestScheduler, ReactiveTest } from 'rx' -const {onNext, onCompleted} = ReactiveTest - -test((t) => { - const out = [] - const sh = new TestScheduler() - const fsOpen = (path, flag) => { - out.push({path, flag}) - return sh.createHotObservable(onNext(300, 9000), onCompleted()) - } - const ob = {fsOpen} - const fd = createFD(ob, 'sample-path') - t.deepEqual(out, []) - fd('w').subscribe((x) => out.push(x)) - sh.start() - t.deepEqual(out, [ - {path: 'sample-path', flag: 'w'}, - 9000 - ]) -}) diff --git a/test/test.initParams.js b/test/test.initParams.js deleted file mode 100644 index f0a296f..0000000 --- a/test/test.initParams.js +++ /dev/null @@ -1,14 +0,0 @@ -import {initParams} from '../src/Utils' -import test from 'ava' - -test((t) => { - const out = initParams({range: 10, path: 'download.dmg', url: 'sample'}) - t.deepEqual(out, {range: 10, mtdPath: 'download.dmg.mtd', path: 'download.dmg', url: 'sample'}) -}) - -test('default:range', (t) => { - const out = initParams({path: 'download.dmg', url: 'sample'}) - t.deepEqual(out, { - range: 3, mtdPath: 'download.dmg.mtd', path: 'download.dmg', url: 'sample' - }) -}) diff --git a/test/test.metaSave.js b/test/test.metaSave.js deleted file mode 100644 index 3d6d2c4..0000000 --- a/test/test.metaSave.js +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Created by tushar.mathur on 25/01/16. - */ - -'use strict' - -import {save} from '../src/Utils' -import test from 'ava' -import {TestScheduler, ReactiveTest} from 'rx' -import {createTestObserver} from '../perf/utils' - -const {onNext, onCompleted} = ReactiveTest - -test((t) => { - const scheduler = new TestScheduler() - const fsWriteJSON = (x) => [[null, JSON.stringify(x.json)]] - const fd = scheduler.createHotObservable(onNext(210, 20), onCompleted()) - const json = scheduler.createHotObservable( - onNext(220, {a: 0, totalBytes: 100}), - onNext(230, {a: 1, totalBytes: 100}), - onNext(240, {a: 2, totalBytes: 100}), - onCompleted() - ) - - const ob = {fsWriteJSON} - const out = createTestObserver(save(ob, fd, json)) - scheduler.start() - t.deepEqual(out, [ - {a: 0, totalBytes: 100}, - {a: 1, totalBytes: 100}, - {a: 2, totalBytes: 100} - ]) -}) - -test('delayed:fd', (t) => { - const scheduler = new TestScheduler() - const fsWriteJSON = (x) => [[null, JSON.stringify(x.json)]] - const fd = scheduler.createHotObservable(onNext(250, 20), onCompleted()) - const json = scheduler.createHotObservable( - onNext(220, {a: 0, totalBytes: 100}), - onCompleted() - ) - - const ob = {fsWriteJSON} - const out = createTestObserver(save(ob, fd, json)) - scheduler.start() - t.deepEqual(out, [{a: 0, totalBytes: 100}]) -}) diff --git a/test/test.splitRange.js b/test/test.splitRange.js index 076ee6a..bfd4b4d 100644 --- a/test/test.splitRange.js +++ b/test/test.splitRange.js @@ -4,9 +4,9 @@ 'use strict' import test from 'ava' -import {splitRange} from '../src/Utils' +import {SplitRange} from '../src/Utils' test((t) => { - t.deepEqual(splitRange(100, 2), [[0, 49], [50, 100]]) - t.deepEqual(splitRange(100, 3), [[0, 32], [33, 65], [66, 100]]) + t.deepEqual(SplitRange(100, 2), [[0, 49], [50, 100]]) + t.deepEqual(SplitRange(100, 3), [[0, 32], [33, 65], [66, 100]]) })