From 83593e2c5d80a7a2d5767f9d437a8fbd6fdd01e8 Mon Sep 17 00:00:00 2001 From: tushar mathur Date: Wed, 29 Jun 2016 13:56:06 +0530 Subject: [PATCH] refactor(utils): move CreateMTDFile and DownloadFromMTDFile to their own respective files --- src/CreateMTDFile.js | 41 ++++++++++++++++ src/DownloadFromMTDFile.js | 74 +++++++++++++++++++++++++++++ src/Main.js | 6 ++- src/Utils.js | 80 -------------------------------- test/test.CreateMTDFile.js | 2 +- test/test.DownloadFromMTDFile.js | 2 +- 6 files changed, 121 insertions(+), 84 deletions(-) create mode 100644 src/CreateMTDFile.js create mode 100644 src/DownloadFromMTDFile.js diff --git a/src/CreateMTDFile.js b/src/CreateMTDFile.js new file mode 100644 index 0000000..79499c9 --- /dev/null +++ b/src/CreateMTDFile.js @@ -0,0 +1,41 @@ +/** + * Created by tushar.mathur on 29/06/16. + */ + +'use strict' +import {mux} from 'muxer' +import {Observable as O} from 'rx' +import { + RemoteFileSize$, + CreateMeta$, + CreateWriteBufferAtParams, + JSToBuffer$ +} from './Utils' + +export const CreateMTDFile = ({FILE, HTTP, options}) => { + /** + * Create a new file + */ + const fd$ = FILE.open(O.just([options.mtdPath, 'w'])) + + /** + * Retrieve file size on remote server + */ + const size$ = RemoteFileSize$({HTTP, options}) + + /** + * Create initial meta data + */ + const meta$ = CreateMeta$({options, size$}) + + /** + * Create a new file with meta info appended at the end + */ + const written$ = FILE.write(CreateWriteBufferAtParams({ + FILE, + fd$: fd$, + buffer$: JSToBuffer$(meta$), + position$: size$ + })) + return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$}) +} diff --git a/src/DownloadFromMTDFile.js b/src/DownloadFromMTDFile.js new file mode 100644 index 0000000..86c8ae7 --- /dev/null +++ b/src/DownloadFromMTDFile.js @@ -0,0 +1,74 @@ +/** + * Created by tushar.mathur on 29/06/16. + */ + +'use strict' + +import {mux} from 'muxer' +import {Observable as O} from 'rx' +import { + CreateWriteBufferAtParams, + JSToBuffer$, + LocalFileSize$, + MetaPosition$, + ReadJSON$, + demuxFPH, + RequestWithMeta, + WriteBuffer, + SetMetaOffsets, + RxThrottleComplete +} from './Utils' + +export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => { + /** + * Open file to read+append + */ + const fd$ = FILE.open(O.just([mtdPath, 'r+'])) + + /** + * Retrieve File size on disk + */ + const size$ = LocalFileSize$({FILE, fd$}) + + /** + * Retrieve Meta info + */ + const metaPosition$ = MetaPosition$({size$}) + const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$}) + + /** + * Make a HTTP request for each thread + */ + const {response$, buffer$} = demuxFPH( + ['buffer$', 'response$'], RequestWithMeta(HTTP, meta$) + ) + + /** + * Create write params and save buffer+offset to disk + */ + const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$}) + + /** + * Update META info + */ + const nMeta$ = SetMetaOffsets({meta$, bufferWritten$}) + + /** + * Persist META to disk + */ + const metaWritten$ = FILE.write(CreateWriteBufferAtParams({ + fd$, + buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)), + position$: size$ + })) + + /** + * Create sink$ + */ + return mux({ + metaWritten$, response$, + localFileSize$: size$, + fdR$: fd$, metaPosition$, + meta$: O.merge(nMeta$, meta$) + }) +} diff --git a/src/Main.js b/src/Main.js index 77703ce..1ddd925 100644 --- a/src/Main.js +++ b/src/Main.js @@ -7,6 +7,8 @@ import fs from 'graceful-fs' import {Observable as O} from 'rx' import R from 'ramda' import * as U from './Utils' +import {CreateMTDFile} from './CreateMTDFile' +import {DownloadFromMTDFile} from './DownloadFromMTDFile' import * as T from './IO' import {mux, demux} from 'muxer' @@ -19,7 +21,7 @@ export const createDownload = (_options) => { /** * Create MTD File */ - const createMTDFile$ = U.CreateMTDFile({FILE, HTTP, options}).share() + const createMTDFile$ = CreateMTDFile({FILE, HTTP, options}).share() const [{fdW$}] = demux(createMTDFile$, 'fdW$') /** @@ -27,7 +29,7 @@ export const createDownload = (_options) => { */ const downloadFromMTDFile$ = createMTDFile$.last() .map({HTTP, FILE, mtdPath: options.mtdPath}) - .flatMap(U.DownloadFromMTDFile) + .flatMap(DownloadFromMTDFile) .share() const [{fdR$, meta$, response$}] = demux(downloadFromMTDFile$, 'meta$', 'fdR$', 'response$') diff --git a/src/Utils.js b/src/Utils.js index 5642f5e..72e86bd 100644 --- a/src/Utils.js +++ b/src/Utils.js @@ -235,83 +235,3 @@ export const RequestWithMeta = R.uncurryN(2, (HTTP) => R.compose( Rx.flatMap(RequestThread(HTTP)), FlattenMeta$ )) -export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => { - /** - * Open file to read+append - */ - const fd$ = FILE.open(O.just([mtdPath, 'r+'])) - - /** - * Retrieve File size on disk - */ - const size$ = LocalFileSize$({FILE, fd$}) - - /** - * Retrieve Meta info - */ - const metaPosition$ = MetaPosition$({size$}) - const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$}) - - /** - * Make a HTTP request for each thread - */ - const {response$, buffer$} = demuxFPH( - ['buffer$', 'response$'], RequestWithMeta(HTTP, meta$) - ) - - /** - * Create write params and save buffer+offset to disk - */ - const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$}) - - /** - * Update META info - */ - const nMeta$ = SetMetaOffsets({meta$, bufferWritten$}) - - /** - * Persist META to disk - */ - const metaWritten$ = FILE.write(CreateWriteBufferAtParams({ - fd$, - buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)), - position$: size$ - })) - - /** - * Create sink$ - */ - return mux({ - metaWritten$, response$, - localFileSize$: size$, - fdR$: fd$, metaPosition$, - meta$: O.merge(nMeta$, meta$) - }) -} -export const CreateMTDFile = ({FILE, HTTP, options}) => { - /** - * Create a new file - */ - const fd$ = FILE.open(O.just([options.mtdPath, 'w'])) - - /** - * Retreive file size on remote server - */ - const size$ = RemoteFileSize$({HTTP, options}) - - /** - * Create initial meta data - */ - const meta$ = CreateMeta$({options, size$}) - - /** - * Create a new file with meta info appended at the end - */ - const written$ = FILE.write(CreateWriteBufferAtParams({ - FILE, - fd$: fd$, - buffer$: JSToBuffer$(meta$), - position$: size$ - })) - return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$}) -} diff --git a/test/test.CreateMTDFile.js b/test/test.CreateMTDFile.js index 9c09f1a..e9223cd 100644 --- a/test/test.CreateMTDFile.js +++ b/test/test.CreateMTDFile.js @@ -6,7 +6,7 @@ import {ReactiveTest, TestScheduler} from 'rx' import test from 'ava' -import {CreateMTDFile} from '../src/Utils' +import {CreateMTDFile} from '../src/CreateMTDFile' import {demux} from 'muxer' /** diff --git a/test/test.DownloadFromMTDFile.js b/test/test.DownloadFromMTDFile.js index 8648e59..d4469cf 100644 --- a/test/test.DownloadFromMTDFile.js +++ b/test/test.DownloadFromMTDFile.js @@ -5,7 +5,7 @@ 'use strict' import {ReactiveTest, TestScheduler} from 'rx' import test from 'ava' -import {DownloadFromMTDFile} from '../src/Utils' +import {DownloadFromMTDFile} from '../src/DownloadFromMTDFile' import {demux} from 'muxer' /**