/
getObjectStream.js
84 lines (69 loc) · 1.54 KB
/
getObjectStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
'use strict';
const { Readable } = require('stream');
const getObjectWrapper = require('./getObjectWrapper');
module.exports = class S3GetObjectStream {
/**
* The buffer size of chunks
*
* @readonly
* @memberof S3GetObjectStream
*/
get bufferSize() {
return 100;
}
/**
* Call the get object stream
*
* @static
* @param {Object} params
* @returns
* @memberof S3GetObjectStream
*/
call(params) {
const getObjectStream = getObjectWrapper.getObject(params);
return Readable.from(this.processChunks(getObjectStream));
}
/**
* Process chunk generators
*
* @static
* @param {Stream} s3Stream
* @memberof S3GetObjectStream
*/
async* processChunks(s3Stream) {
const awaitedStream = await s3Stream;
let buffer = [];
for await (const chunk of awaitedStream) {
buffer.push(chunk);
if(buffer.length < this.bufferSize)
continue;
yield* this.transformByBuffer(buffer);
buffer = [];
}
if(buffer.length)
yield* this.transformByBuffer(buffer);
}
/**
* Transform chunks by buffer information
*
* @static
* @param {Array} buffer
* @memberof S3GetObjectStream
*/
async* transformByBuffer(buffer) {
const processedBuffer = await this.processBuffer(buffer);
for await (const bufferChunk of processedBuffer)
yield Buffer.from(typeof bufferChunk === 'object' ? JSON.stringify(bufferChunk) : bufferChunk);
}
/**
* For process chunks
*
* @static
* @param {Array} buffer
* @returns
* @memberof S3GetObjectStream
*/
async processBuffer(buffer) {
return [...buffer];
}
};