Skip to content

Commit

Permalink
fix: fix event handling for file-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Oct 30, 2023
1 parent e57b9a0 commit 5f472b3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
24 changes: 22 additions & 2 deletions src/_file-reader.js
Expand Up @@ -2,16 +2,20 @@ import fs from 'fs';
import es from 'event-stream';
import glob from 'glob';

const MAX_QUEUE_SIZE = 15;

export default function fileReaderFactory(indexer, fileName, transform, splitRegex, verbose) {
function startIndex(files) {
let ingestQueueSize = 0;
let finished = false;

const file = files.shift();
const s = fs
.createReadStream(file)
.pipe(es.split(splitRegex))
.pipe(
es
.mapSync(line => {
s.pause();
try {
const doc = typeof transform === 'function' ? transform(line) : line;
// if doc is undefined we'll skip indexing it
Expand All @@ -37,14 +41,30 @@ export default function fileReaderFactory(indexer, fileName, transform, splitReg
})
.on('end', () => {
if (verbose) console.log('Read entire file: ', file);
indexer.finish();
if (files.length > 0) {
startIndex(files);
return;
}

indexer.finish();
finished = true;
}),
);

indexer.queueEmitter.on('queue-size', async size => {
if (finished) return;
ingestQueueSize = size;

if (ingestQueueSize < MAX_QUEUE_SIZE) {
s.resume();
} else {
s.pause();
}
});

indexer.queueEmitter.on('resume', () => {
if (finished) return;
ingestQueueSize = 0;
s.resume();
});
}
Expand Down
14 changes: 11 additions & 3 deletions src/_index-queue.js
Expand Up @@ -18,6 +18,7 @@ export default function indexQueueFactory({
const queue = [];
let ingesting = 0;
let ingestTimes = [];
let finished = false;

const ingest = b => {
if (typeof b !== 'undefined') {
Expand Down Expand Up @@ -68,8 +69,11 @@ export default function indexQueueFactory({
) {
parallelCalls -= 1;
}

if (queue.length > 0) {
ingest();
} else if (queue.length === 0 && finished) {
queueEmitter.emit('finish');
}
})
.catch(error => {
Expand All @@ -85,6 +89,10 @@ export default function indexQueueFactory({

return {
add: doc => {
if (finished) {
throw new Error('Unexpected doc added after indexer should finish.');
}

if (!skipHeader) {
const header = { index: { _index: targetIndexName } };
buffer.push(header);
Expand All @@ -101,10 +109,10 @@ export default function indexQueueFactory({
buffer = [];
}
},
finish: async () => {
await ingest(buffer);
finish: () => {
ingest(buffer);
finished = true;
buffer = [];
queueEmitter.emit('finish');
},
queueEmitter,
};
Expand Down

0 comments on commit 5f472b3

Please sign in to comment.