Skip to content

Commit

Permalink
refactor(utils): FinalizeDownload() is triggered only when the downlo…
Browse files Browse the repository at this point in the history
…ad is completed
  • Loading branch information
tusharmath committed Jun 27, 2016
1 parent 5fe8e11 commit c982ea4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 11 deletions.
38 changes: 27 additions & 11 deletions src/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,6 @@ export const RxThrottleComplete = (window$, $, sh) => {
const selector = window => O.merge($.throttle(window, sh), $.last())
return window$.first().flatMap(selector)
}
export const RemoveMeta = ({FILE, meta$, fd$}) => {
const size$ = meta$.pluck('totalBytes')
return FILE.truncate(O.combineLatest(fd$, size$).take(1))
}
export const ResetFileName = ({FILE, meta$}) => {
const params$ = meta$.map(meta => [meta.mtdPath, meta.path]).take(1)
return FILE.rename(params$)
}
export const IsCompleted$ = (meta$) => {
const offsetsA = R.prop('offsets')
const offsetsB = R.compose(R.map(second), R.prop('threads'))
Expand All @@ -193,10 +185,34 @@ export const IsCompleted$ = (meta$) => {
const isComplete = R.converge(diff, [offsetsA, offsetsB])
return meta$.map(isComplete).distinctUntilChanged()
}

/**
* Removes the appended meta data and the .mtd extension from the file. In case
* there still some data leftover to be downloaded, this step will be ignored.
* @function
* @param {Object} FILE - File transformer
* @param {Observable} fd$ - File descriptor observable
* @param {Observable} meta$ - Download meta information
* @returns {Observable}
*/
export const FinalizeDownload = ({FILE, fd$, meta$}) => {
const metaRemoved$ = RemoveMeta({FILE, meta$, fd$})
const renamed$ = metaRemoved$.flatMap(() => ResetFileName({FILE, meta$}))
return mux({metaRemoved$, renamed$})
const [ok$, noop$] = IsCompleted$(meta$).partition(Boolean)
const Truncate = ({FILE, meta$, fd$}) => {
const size$ = meta$.pluck('totalBytes')
return FILE.truncate(O.combineLatest(fd$, size$).take(1))
}
const Rename = ({FILE, meta$}) => {
const params$ = meta$.map(meta => [meta.mtdPath, meta.path]).take(1)
return FILE.rename(params$)
}
return O.merge(
mux({noop$}),
ok$.flatMap(() => {
const truncated$ = Truncate({FILE, meta$, fd$})
const renamed$ = truncated$.flatMap(() => Rename({FILE, meta$}))
return mux({truncated$, renamed$})
})
)
}
export const WriteBuffer = ({FILE, fd$, buffer$}) => {
const Write = R.compose(FILE.write, CreateWriteBufferParams)
Expand Down
73 changes: 73 additions & 0 deletions test/test.FinalizeDownload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Created by tushar.mathur on 27/06/16.
*/

'use strict'
import {ReactiveTest, TestScheduler} from 'rx'
import test from 'ava'
import {FinalizeDownload} from '../src/Utils'
const {onNext, onCompleted} = ReactiveTest

test('complete', t => {
const sh = new TestScheduler()
const fd$ = sh.createHotObservable(onNext(210, 100), onCompleted(210))
const meta$ = sh.createHotObservable(
onNext(220, {
threads: [[0, 10], [11, 20]],
offsets: [10, 20]
}),
onCompleted(220)
)
const truncate$ = sh.createHotObservable(
onNext(300, 'TRUNCATED'),
onCompleted(300)
)

const rename$ = sh.createHotObservable(
onNext(400, 'RENAMED'),
onCompleted(400)
)
const FILE = {
truncate: () => truncate$,
rename: () => rename$
}
const {messages} = sh.startScheduler(
() => FinalizeDownload({FILE, fd$, meta$})
)
t.deepEqual(messages, [
onNext(300, ['truncated$', 'TRUNCATED']),
onNext(400, ['renamed$', 'RENAMED']),
onCompleted(400)
])
})

test('incomplete', t => {
const sh = new TestScheduler()
const fd$ = sh.createHotObservable(onNext(210, 100), onCompleted(210))
const meta$ = sh.createHotObservable(
onNext(220, {
threads: [[0, 10], [11, 20]],
offsets: [5, 20]
}),
onCompleted(220)
)
const truncate$ = sh.createHotObservable(
onNext(300, 'TRUNCATED'),
onCompleted(300)
)
const rename$ = sh.createHotObservable(
onNext(400, 'RENAMED'),
onCompleted(400)
)
const FILE = {
truncate: () => truncate$,
rename: () => rename$
}
const {messages} = sh.startScheduler(
() => FinalizeDownload({FILE, fd$, meta$})
)
t.deepEqual(messages, [
onNext(220, ['noop$', false]),
onCompleted(220)
])
})

0 comments on commit c982ea4

Please sign in to comment.