diff --git a/.gitignore b/.gitignore index 355928d..40780d8 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ jspm_packages logs/ certs/ .vscode/*.sql +.vscode/settings.json diff --git a/lib/client/NativeKafkaClient.js b/lib/client/NativeKafkaClient.js index 5323960..99a184e 100644 --- a/lib/client/NativeKafkaClient.js +++ b/lib/client/NativeKafkaClient.js @@ -104,15 +104,15 @@ class NativeKafkaClient extends KafkaClient { debug("consumer ready"); if (withBackPressure) { - this.consumer.consume((message, done) => { + return this.consumer.consume((message, done) => { super.emit("message", message); done(); - }, false, false, this.batchOptions).catch(e => kafkaErrorCallback(e)); + }, false, false, this.batchOptions); } else { - this.consumer.consume().catch(e => kafkaErrorCallback(e)); this.consumer.on("message", message => { super.emit("message", message); }); + return this.consumer.consume(); } }).catch(e => kafkaErrorCallback(e)); });