Skip to content

Commit

Permalink
feat: bulk ingest with parallel calls and dynamic backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Oct 12, 2023
1 parent 370a7fc commit 0c7311d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
52 changes: 36 additions & 16 deletions src/_index-queue.js
Expand Up @@ -4,6 +4,8 @@ const EventEmitter = require('events');

const queueEmitter = new EventEmitter();

let parallelCalls = 1;

// a simple helper queue to bulk index documents
export default function indexQueueFactory({
targetClient: client,
Expand All @@ -14,36 +16,54 @@ export default function indexQueueFactory({
}) {
let buffer = [];
const queue = [];
let ingesting = false;
let ingesting = 0;
let ingestTimes = [];

const ingest = async b => {
const ingest = b => {
if (typeof b !== 'undefined') {
queue.push(b);
queueEmitter.emit('queue-size', queue.length);
}

if (ingesting === false) {
if (ingestTimes.length > 5) ingestTimes = ingestTimes.slice(-5);

if (ingesting < parallelCalls) {
const docs = queue.shift();

queueEmitter.emit('queue-size', queue.length);
ingesting = true;
if (queue.length <= 5) {
queueEmitter.emit('resume');
}

ingesting += 1;

if (verbose)
console.log(`bulk ingest docs: ${docs.length / 2}, queue length: ${queue.length}`);

try {
await client.bulk({ body: docs });
ingesting = false;
const start = Date.now();
client.bulk({ body: docs }).then(() => {
const end = Date.now();
const delta = end - start;
ingestTimes.push(delta);
ingesting -= 1;

const ingestTimesMovingAverage =
ingestTimes.length > 0 ? ingestTimes.reduce((p, c) => p + c, 0) / ingestTimes.length : 0;
const ingestTimesMovingAverageSeconds = Math.floor(ingestTimesMovingAverage / 1000);

if (ingestTimes.length > 0 && ingestTimesMovingAverageSeconds < 30 && parallelCalls < 10) {
parallelCalls += 1;
} else if (
ingestTimes.length > 0 &&
ingestTimesMovingAverageSeconds >= 30 &&
parallelCalls > 1
) {
parallelCalls -= 1;
}
if (queue.length > 0) {
ingest();
}
} catch (err) {
console.log('bulk index error', err);
}
}

// console.log(`ingest: queue.length ${queue.length}`);
if (queue.length === 0) {
queueEmitter.emit('queue-size', 0);
queueEmitter.emit('resume');
});
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/_index-reader.js
Expand Up @@ -2,7 +2,7 @@ import cliProgress from 'cli-progress';

import { DEFAULT_BUFFER_SIZE } from './_constants';

const MAX_QUEUE_SIZE = 5;
const MAX_QUEUE_SIZE = 15;

// create a new progress bar instance and use shades_classic theme
const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic);
Expand Down

0 comments on commit 0c7311d

Please sign in to comment.