Skip to content

Commit

Permalink
refactor(utils): move CreateMTDFile and DownloadFromMTDFile to their …
Browse files Browse the repository at this point in the history
…own respective files
  • Loading branch information
tusharmath committed Jun 29, 2016
1 parent c245a87 commit 83593e2
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 84 deletions.
41 changes: 41 additions & 0 deletions src/CreateMTDFile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Created by tushar.mathur on 29/06/16.
*/

'use strict'
import {mux} from 'muxer'
import {Observable as O} from 'rx'
import {
RemoteFileSize$,
CreateMeta$,
CreateWriteBufferAtParams,
JSToBuffer$
} from './Utils'

export const CreateMTDFile = ({FILE, HTTP, options}) => {
/**
* Create a new file
*/
const fd$ = FILE.open(O.just([options.mtdPath, 'w']))

/**
* Retrieve file size on remote server
*/
const size$ = RemoteFileSize$({HTTP, options})

/**
* Create initial meta data
*/
const meta$ = CreateMeta$({options, size$})

/**
* Create a new file with meta info appended at the end
*/
const written$ = FILE.write(CreateWriteBufferAtParams({
FILE,
fd$: fd$,
buffer$: JSToBuffer$(meta$),
position$: size$
}))
return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$})
}
74 changes: 74 additions & 0 deletions src/DownloadFromMTDFile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Created by tushar.mathur on 29/06/16.
*/

'use strict'

import {mux} from 'muxer'
import {Observable as O} from 'rx'
import {
CreateWriteBufferAtParams,
JSToBuffer$,
LocalFileSize$,
MetaPosition$,
ReadJSON$,
demuxFPH,
RequestWithMeta,
WriteBuffer,
SetMetaOffsets,
RxThrottleComplete
} from './Utils'

export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => {
/**
* Open file to read+append
*/
const fd$ = FILE.open(O.just([mtdPath, 'r+']))

/**
* Retrieve File size on disk
*/
const size$ = LocalFileSize$({FILE, fd$})

/**
* Retrieve Meta info
*/
const metaPosition$ = MetaPosition$({size$})
const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$})

/**
* Make a HTTP request for each thread
*/
const {response$, buffer$} = demuxFPH(
['buffer$', 'response$'], RequestWithMeta(HTTP, meta$)
)

/**
* Create write params and save buffer+offset to disk
*/
const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$})

/**
* Update META info
*/
const nMeta$ = SetMetaOffsets({meta$, bufferWritten$})

/**
* Persist META to disk
*/
const metaWritten$ = FILE.write(CreateWriteBufferAtParams({
fd$,
buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)),
position$: size$
}))

/**
* Create sink$
*/
return mux({
metaWritten$, response$,
localFileSize$: size$,
fdR$: fd$, metaPosition$,
meta$: O.merge(nMeta$, meta$)
})
}
6 changes: 4 additions & 2 deletions src/Main.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import fs from 'graceful-fs'
import {Observable as O} from 'rx'
import R from 'ramda'
import * as U from './Utils'
import {CreateMTDFile} from './CreateMTDFile'
import {DownloadFromMTDFile} from './DownloadFromMTDFile'
import * as T from './IO'
import {mux, demux} from 'muxer'

Expand All @@ -19,15 +21,15 @@ export const createDownload = (_options) => {
/**
* Create MTD File
*/
const createMTDFile$ = U.CreateMTDFile({FILE, HTTP, options}).share()
const createMTDFile$ = CreateMTDFile({FILE, HTTP, options}).share()
const [{fdW$}] = demux(createMTDFile$, 'fdW$')

/**
* Download From MTD File
*/
const downloadFromMTDFile$ = createMTDFile$.last()
.map({HTTP, FILE, mtdPath: options.mtdPath})
.flatMap(U.DownloadFromMTDFile)
.flatMap(DownloadFromMTDFile)
.share()

const [{fdR$, meta$, response$}] = demux(downloadFromMTDFile$, 'meta$', 'fdR$', 'response$')
Expand Down
80 changes: 0 additions & 80 deletions src/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,83 +235,3 @@ export const RequestWithMeta = R.uncurryN(2, (HTTP) => R.compose(
Rx.flatMap(RequestThread(HTTP)),
FlattenMeta$
))
export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => {
/**
* Open file to read+append
*/
const fd$ = FILE.open(O.just([mtdPath, 'r+']))

/**
* Retrieve File size on disk
*/
const size$ = LocalFileSize$({FILE, fd$})

/**
* Retrieve Meta info
*/
const metaPosition$ = MetaPosition$({size$})
const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$})

/**
* Make a HTTP request for each thread
*/
const {response$, buffer$} = demuxFPH(
['buffer$', 'response$'], RequestWithMeta(HTTP, meta$)
)

/**
* Create write params and save buffer+offset to disk
*/
const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$})

/**
* Update META info
*/
const nMeta$ = SetMetaOffsets({meta$, bufferWritten$})

/**
* Persist META to disk
*/
const metaWritten$ = FILE.write(CreateWriteBufferAtParams({
fd$,
buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)),
position$: size$
}))

/**
* Create sink$
*/
return mux({
metaWritten$, response$,
localFileSize$: size$,
fdR$: fd$, metaPosition$,
meta$: O.merge(nMeta$, meta$)
})
}
export const CreateMTDFile = ({FILE, HTTP, options}) => {
/**
* Create a new file
*/
const fd$ = FILE.open(O.just([options.mtdPath, 'w']))

/**
* Retreive file size on remote server
*/
const size$ = RemoteFileSize$({HTTP, options})

/**
* Create initial meta data
*/
const meta$ = CreateMeta$({options, size$})

/**
* Create a new file with meta info appended at the end
*/
const written$ = FILE.write(CreateWriteBufferAtParams({
FILE,
fd$: fd$,
buffer$: JSToBuffer$(meta$),
position$: size$
}))
return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$})
}
2 changes: 1 addition & 1 deletion test/test.CreateMTDFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import {ReactiveTest, TestScheduler} from 'rx'
import test from 'ava'
import {CreateMTDFile} from '../src/Utils'
import {CreateMTDFile} from '../src/CreateMTDFile'
import {demux} from 'muxer'

/**
Expand Down
2 changes: 1 addition & 1 deletion test/test.DownloadFromMTDFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict'
import {ReactiveTest, TestScheduler} from 'rx'
import test from 'ava'
import {DownloadFromMTDFile} from '../src/Utils'
import {DownloadFromMTDFile} from '../src/DownloadFromMTDFile'
import {demux} from 'muxer'

/**
Expand Down

0 comments on commit 83593e2

Please sign in to comment.