@@ -4,7 +4,9 @@ const Promise = require('bluebird')
4
4
5
5
const index = require ( './lib/entry-index' )
6
6
const finished = Promise . promisify ( require ( 'mississippi' ) . finished )
7
+ const memo = require ( './lib/memoization' )
7
8
const pipe = require ( 'mississippi' ) . pipe
9
+ const pipeline = require ( 'mississippi' ) . pipeline
8
10
const read = require ( './lib/content/read' )
9
11
const through = require ( 'mississippi' ) . through
10
12
@@ -16,39 +18,137 @@ module.exports.byDigest = function getByDigest (cache, digest, opts) {
16
18
}
17
19
function getData ( byDigest , cache , key , opts ) {
18
20
opts = opts || { }
19
- const src = ( byDigest ? getStream . byDigest : getStream ) ( cache , key , opts )
20
- let data = ''
21
- let meta
22
- src . on ( 'data' , function ( d ) { data += d } )
23
- src . on ( 'metadata' , function ( m ) { meta = m } )
24
- return finished ( src ) . then ( ( ) => ( { data, meta } ) )
21
+ opts . hashAlgorithm = opts . hashAlgorithm || 'sha1'
22
+ const memoized = (
23
+ byDigest
24
+ ? memo . get . byDigest ( cache , key , opts . hashAlgorithm )
25
+ : memo . get ( cache , key )
26
+ )
27
+ if ( memoized && opts . memoize !== false ) {
28
+ return Promise . resolve ( {
29
+ metadata : memoized . entry . metadata ,
30
+ data : memoized . data ,
31
+ digest : memoized . entry . digest ,
32
+ hashAlgorithm : memoized . entry . hashAlgorithm
33
+ } )
34
+ }
35
+ const src = ( byDigest ? getStreamDigest : getStream ) ( cache , key , opts )
36
+ let acc = [ ]
37
+ let dataTotal = 0
38
+ let metadata
39
+ let digest
40
+ let hashAlgorithm
41
+ if ( ! byDigest ) {
42
+ src . on ( 'digest' , d => {
43
+ digest = d
44
+ } )
45
+ src . on ( 'hashAlgorithm' , d => { hashAlgorithm = d } )
46
+ src . on ( 'metadata' , d => { metadata = d } )
47
+ }
48
+ src . on ( 'data' , d => {
49
+ acc . push ( d )
50
+ dataTotal += d . length
51
+ } )
52
+ return finished ( src ) . then ( ( ) => {
53
+ const data = Buffer . concat ( acc , dataTotal )
54
+ return byDigest ? data : { metadata, data, digest, hashAlgorithm }
55
+ } )
25
56
}
26
57
27
58
module . exports . stream = getStream
28
- module . exports . stream . byDigest = read . readStream
29
59
function getStream ( cache , key , opts ) {
30
- const stream = through ( )
31
- index . find ( cache , key ) . catch ( err => {
32
- stream . emit ( 'error' , err )
33
- } ) . then ( data => {
34
- if ( ! data ) {
60
+ opts = opts || { }
61
+ let stream = through ( )
62
+ const memoized = memo . get ( cache , key )
63
+ if ( memoized && opts . memoize !== false ) {
64
+ stream . on ( 'newListener' , function ( ev , cb ) {
65
+ ev === 'metadata' && cb ( memoized . entry . metadata )
66
+ ev === 'digest' && cb ( memoized . entry . digest )
67
+ ev === 'hashAlgorithm' && cb ( memoized . entry . hashAlgorithm )
68
+ } )
69
+ stream . write ( memoized . data , ( ) => stream . end ( ) )
70
+ return stream
71
+ }
72
+ index . find ( cache , key ) . then ( entry => {
73
+ if ( ! entry ) {
35
74
return stream . emit (
36
75
'error' , index . notFoundError ( cache , key )
37
76
)
38
77
}
39
- stream . emit ( 'metadata' , data )
78
+ let memoStream
79
+ if ( opts . memoize ) {
80
+ let memoData = [ ]
81
+ let memoLength = 0
82
+ memoStream = through ( ( c , en , cb ) => {
83
+ memoData && memoData . push ( c )
84
+ memoLength += c . length
85
+ cb ( null , c , en )
86
+ } , cb => {
87
+ memoData && memo . put ( cache , entry , Buffer . concat ( memoData , memoLength ) )
88
+ cb ( )
89
+ } )
90
+ } else {
91
+ memoStream = through ( )
92
+ }
93
+ // TODO - don't overwrite someone else's `opts`.
94
+ opts . hashAlgorithm = entry . hashAlgorithm
95
+ stream . emit ( 'metadata' , entry . metadata )
96
+ stream . emit ( 'hashAlgorithm' , entry . hashAlgorithm )
97
+ stream . emit ( 'digest' , entry . digest )
40
98
stream . on ( 'newListener' , function ( ev , cb ) {
41
- ev === 'metadata' && cb ( data )
99
+ ev === 'metadata' && cb ( entry . metadata )
100
+ ev === 'digest' && cb ( entry . digest )
101
+ ev === 'hashAlgorithm' && cb ( entry . hashAlgorithm )
42
102
} )
43
103
pipe (
44
- read . readStream ( cache , data . digest , opts ) ,
104
+ read . readStream ( cache , entry . digest , opts ) ,
105
+ memoStream ,
45
106
stream
46
107
)
47
- } )
108
+ } , err => stream . emit ( 'error' , err ) )
48
109
return stream
49
110
}
50
111
112
+ module . exports . stream . byDigest = getStreamDigest
113
+ function getStreamDigest ( cache , digest , opts ) {
114
+ opts = opts || { }
115
+ opts . hashAlgorithm = opts . hashAlgorithm || 'sha1'
116
+ const memoized = memo . get . byDigest ( cache , digest , opts . hashAlgorithm )
117
+ if ( memoized && opts . memoize !== false ) {
118
+ const stream = through ( )
119
+ stream . write ( memoized , ( ) => stream . end ( ) )
120
+ return stream
121
+ } else {
122
+ let stream = read . readStream ( cache , digest , opts )
123
+ if ( opts . memoize ) {
124
+ let memoData = [ ]
125
+ let memoLength = 0
126
+ const memoStream = through ( ( c , en , cb ) => {
127
+ memoData && memoData . push ( c )
128
+ memoLength += c . length
129
+ cb ( null , c , en )
130
+ } , cb => {
131
+ memoData && memo . put . byDigest (
132
+ cache ,
133
+ digest ,
134
+ opts . hashAlgorithm ,
135
+ Buffer . concat ( memoData , memoLength )
136
+ )
137
+ cb ( )
138
+ } )
139
+ stream = pipeline ( stream , memoStream )
140
+ }
141
+ return stream
142
+ }
143
+ }
144
+
51
145
module . exports . info = info
52
- function info ( cache , key ) {
53
- return index . find ( cache , key )
146
+ function info ( cache , key , opts ) {
147
+ opts = opts || { }
148
+ const memoized = memo . get ( cache , key )
149
+ if ( memoized && opts . memoize !== false ) {
150
+ return Promise . resolve ( memoized . entry )
151
+ } else {
152
+ return index . find ( cache , key )
153
+ }
54
154
}
0 commit comments