-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plat 718 add kafka input output #205
Conversation
Add properties used for creating the topics to consume from and specifying the topics to produce to PLAT-718
The kafkajs file runs on versions above 14 only
Some libraries were not compatible with the old node versions
Codecov Report
@@ Coverage Diff @@
## master #205 +/- ##
==========================================
+ Coverage 96.78% 96.79% +0.01%
==========================================
Files 21 23 +2
Lines 964 1062 +98
==========================================
+ Hits 933 1028 +95
- Misses 31 34 +3
|
The library kafkajs does not have the feature for unsubscribing a topic from a consumer. Work around for this is to disconnect the consumer and start a new instance of the consumer and resubscribe topics that should remain subscribed PLAT-718
Support for kafka has been added to the mediator PLAT-718
The app fails on node lts/gallium due to some libraries not being compatible with this version
This enables us to stub these function in tests
|
||
const subscribeTopicToConsumer = async endpoint => { | ||
if (endpoint.kafkaConsumerTopic) { | ||
await kafkaConsumer.subscribe(endpoint.kafkaConsumerTopic, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to either wrap this in a try/catch, or make sure that every place that calls it handles the error path (so make sure we call next(err) in the mongo models so we don't wait forever).
Currently if you have a kafka consumer endpoint saved but kafak is not running, if you try and start the project it will die ("uncaught KafkaJSConnectionClosedError: Closed connection").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will allow for other endpoints to still run and does not create a hard dependency on kafka being up and running for the mapping mediator to function. The only issue now is the consumer will never subscribe if we start the mapping mediator if kafka was down at the same time. It may be a good idea to catch the ConnectionClosedError itself and then write polling logic to try and reconnect once kafka is up and running again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 55e9fa3
src/kafka.js
Outdated
const subscribeTopicToConsumer = async endpoint => { | ||
if (endpoint.kafkaConsumerTopic) { | ||
await kafkaConsumer.subscribe(endpoint.kafkaConsumerTopic, { | ||
url: `http://localhost:${port}/${endpoint.endpoint.pattern}`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
url: `http://localhost:${port}/${endpoint.endpoint.pattern}`, | |
url: `http://localhost:${port}${endpoint.endpoint.pattern}`, |
Since endpoint.endpoint.pattern
follows the pattern: "/pattern" we should remove the / otherwise we end up with //
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 55e9fa3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, just the one comment I left on the subscriber side of things
No description provided.