/
IO.js
122 lines (110 loc) · 3.15 KB
/
IO.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
'use strict'
import {Observable as O} from 'rx'
import * as Rx from './RxFP'
import {demux} from 'muxer'
import R from 'ramda'
import {Request} from './Request'
export const fromCB = R.compose(R.apply, O.fromNodeCallback)
export const toOB = cb => R.compose(Rx.shareReplay(1), Rx.flatMap(fromCB(cb)))
/**
* Provides wrappers over the async utils inside the
* {@link https://nodejs.org/api/fs.html fs module}.
* The wrappers take in an input stream of arguments
* and returns the result of function call as another stream.
* @namespace FILE
*/
export const FILE = R.curry(fs => {
return {
/**
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
open: toOB(fs.open),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
fstat: toOB(fs.fstat),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_read_fd_buffer_offset_length_position_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
read: toOB(fs.read),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_write_fd_buffer_offset_length_position_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
write: toOB(fs.write),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_close_fd_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
close: toOB(fs.close),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_truncate_path_len_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
truncate: toOB(fs.truncate),
/**
* {@link https://nodejs.org/api/fs.html#fs_fs_rename_oldpath_newpath_callback}
* @function
* @memberOf FILE
* @param {external:Observable} params$
* @return {external:Observable}
*/
rename: toOB(fs.rename)
}
})
/**
* @namespace HTTP
*/
export const HTTP = R.curry(_request => {
const request = Request(_request)
const requestHead = params => {
const [{response$}] = demux(request(params), 'response$')
return response$
.first()
.tap(x => x.destroy())
.share()
}
const select = R.curry((event, request$) =>
request$.filter(x => x.event === event).pluck('message')
)
return {
requestHead,
select,
/**
* Stream based wrapper over {@link https://www.npmjs.com/package/request npm/request}
* @function
* @memberOf HTTP
* @param {object} params - {@link https://www.npmjs.com/package/request request} module params.
* @return {external:Observable} multiplex stream
*/
request
}
})
export const BAR = R.curry(ProgressBar => {
const bar = new ProgressBar(':bar :percent ', {
total: 1000,
complete: '█',
incomplete: '░'
})
return bar.update.bind(bar)
})