This repository has been archived by the owner on Oct 30, 2018. It is now read-only.
/
file-demuxer.js
173 lines (140 loc) · 4.58 KB
/
file-demuxer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
'use strict';
var inherits = require('util').inherits;
var assert = require('assert');
var stream = require('readable-stream');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var merge = require('merge');
var utils = require('../utils');
var os = require('os');
/**
* Takes a single file read stream and outputs several output streams, used for
* "shredding" a file and creating muliple out destination interfaces
* @constructor
* @license LGPL-3.0
* @param {String} filePath - Path the file to demultiplex
* @param {Object} options
* @param {Number} options.shardSize - Size of each shard
* @fires FileDemuxer#shard
*/
function FileDemuxer(filePath, options) {
if (!(this instanceof FileDemuxer)) {
return new FileDemuxer(filePath, options);
}
assert(
utils.existsSync(filePath),
'File does not exist at the supplied path'
);
options = merge(Object.create(FileDemuxer.DEFAULTS), options);
this._fileSize = fs.statSync(filePath).size;
this._filePosition = 0;
this._shardSize = options.shardSize;
this._source = fs.createReadStream(filePath);
this._currentShardIndex = 0;
EventEmitter.call(this);
setImmediate(this._openStream.bind(this));
}
FileDemuxer.SHARD_MULTIPLES_BACK = 5;
FileDemuxer.DEFAULTS = {
shardSize: 1024 * 1024 * 8
};
/**
* Triggered when the demuxer has a shard ready to stream
* @event FileDemuxer#shard
* @param {ReadableStream} shard - The file shard as a readable stream
*/
/**
* Triggered when the demuxer has finished writing to all shards
* @event FileDemuxer#finish
*/
inherits(FileDemuxer, EventEmitter);
/**
* Opens the underyling readable stream
* @private
*/
FileDemuxer.prototype._openStream = function() {
if (this._fileSize === 0) {
this._currentOutput = new stream.Readable({ read: utils.noop });
return this.emit('error', new Error('File size cannot be 0 Bytes.'));
}
this._source.on('data', this._handleSourceBytes.bind(this));
this._source.on('end', this._handleSourceEnded.bind(this));
};
/**
* Handles incoming data from the source stream
* @private
*/
FileDemuxer.prototype._handleSourceBytes = function(chunk) {
if (!this._currentOutput) {
this._currentOutput = new stream.Readable({ read: utils.noop });
this.emit('shard', this._currentOutput, this._currentShardIndex);
}
if (this._needsNewOutputStream()) {
this._closeCurrentOutput();
this._currentOutput = new stream.Readable({ read: utils.noop });
this.emit('shard', this._currentOutput, ++this._currentShardIndex);
}
setImmediate(this._pushBytesToOutput.bind(this, chunk));
};
/**
* Closes the current output source and emits a finish event
* @private
*/
FileDemuxer.prototype._handleSourceEnded = function() {
this._closeCurrentOutput();
this.emit('finish');
};
/**
* Simply pushes the given bytes to the current output
* @private
*/
FileDemuxer.prototype._pushBytesToOutput = function(bytes) {
if (bytes) {
this._filePosition += bytes.length;
}
this._currentOutput.push(bytes);
};
/**
* Simply closes the output stream
* @private
*/
FileDemuxer.prototype._closeCurrentOutput = function() {
this._pushBytesToOutput(null);
};
/**
* Returns a boolean indicating if we should create a new shard stream
* @private
*/
FileDemuxer.prototype._needsNewOutputStream = function() {
var expectedIndex = Math.floor(this._filePosition / this._shardSize);
return this._currentShardIndex < expectedIndex;
};
/**
* Determine the optimal shard size given an arbitrary file size in bytes
* @param {Number} fileSize - The number of bytes in the given file
* @param {Number} [acc=1] - Accumulator (number of recursions)
* @returns {Number} shardSize
*/
FileDemuxer.getOptimalShardSize = function(fileSize, acc) {
var accumulator = typeof acc === 'undefined' ? 0 : acc;
// Determine hops back by accumulator
var hops = (accumulator - FileDemuxer.SHARD_MULTIPLES_BACK) < 0 ?
0 :
accumulator - FileDemuxer.SHARD_MULTIPLES_BACK;
// Calculate bytemultiple shard size by hops back
var shardSize = function(hops) {
return (8 * (1024 * 1024)) * Math.pow(2, hops);
};
var byteMultiple = shardSize(accumulator);
var check = fileSize / byteMultiple;
// Determine if bytemultiple is highest bytemultiple that is still <= fileSize
if (check > 0 && check <= 1) {
// TODO: Change 3 to (shard concurrency * file concurrency)
while (hops > 0 && (os.freemem() / shardSize(hops) <= 3)){
hops = hops - 1 <= 0 ? 0 : hops - 1;
}
return shardSize(hops);
}
return this.getOptimalShardSize(fileSize, ++accumulator);
};
module.exports = FileDemuxer;