-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/zenko 266 ingestion consumer #332
Feature/zenko 266 ingestion consumer #332
Conversation
Hello JianqinWang,My role is to assist you with the merge of this Status report is not available. |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
PR has been updated. Reviewers, please be cautious. |
1 similar comment
PR has been updated. Reviewers, please be cautious. |
3b6789c
to
be1c564
Compare
PR has been updated. Reviewers, please be cautious. |
be1c564
to
21fb915
Compare
PR has been updated. Reviewers, please be cautious. |
lib/models/QueueEntry.js
Outdated
@@ -1,9 +1,9 @@ | |||
const { usersBucket } = require('arsenal').constants; | |||
|
|||
const ObjectQueueEntry = | |||
require('../../extensions/replication/utils/ObjectQueueEntry'); | |||
require('../../extensions/utils/ObjectQueueEntry'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're moving those helper classes, let's move them to lib/models/
instead beside QueueEntry
class.
extensions/index.js
Outdated
@@ -5,7 +5,7 @@ const extensions = {}; | |||
|
|||
fs.readdirSync(__dirname).forEach(moduleDir => { | |||
const extStat = fs.statSync(path.join(__dirname, moduleDir)); | |||
if (extStat.isDirectory()) { | |||
if (extStat.isDirectory() && moduleDir !== 'utils') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: if you move the helper classes to lib/models/
you should be able to get rid of this special case.
this._consumer = new BackbeatConsumer({ | ||
// zookeeper: { connectionString: this.zkConfig.connectionString }, | ||
topic: this.mongoProcessorConfig.topic, | ||
groupId: `${this.mongoProcessorConfig.groupId}1`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why suffixing with 1
?
queueProcessor: this.processKafkaEntry.bind(this), | ||
}); | ||
this._consumer.on('ready', () => { | ||
this._consumer.subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taylor recently added kubernetes healthcheck support, you would have to add the small support code around here to link to the healthcheck server (see changes in other backbeat components for examples, it should be almost a cut&paste).
this.logger, err => { | ||
if (err) { | ||
this.logger.error('error deleting object from mongo', | ||
{ error: err }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ bucket, key, error: err.message }
this.logger, err => { | ||
if (err) { | ||
this.logger.error('error putting to usersbucket', | ||
{ error: err, bucketOwnerKey }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error: err.message
{ error: err, bucketOwnerKey }); | ||
return done(err); | ||
} | ||
this.logger.info('successfully put to usersBucket', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usersBucket
is internal and should not be exposed in an info log. What about bucket entry put into mongo
? Not convinced entry
is the best wording, let you decide :)
return process.nextTick(done); | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is legacy code, please remove (now we can bootstrap by passing a bootstrap
parameter to BackbeatConsumer
class, but no need to deal with it if you don't use it in actual tests).
@@ -11,14 +11,14 @@ const RoundRobin = require('arsenal').network.RoundRobin; | |||
const BackbeatProducer = require('../../../lib/BackbeatProducer'); | |||
const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); | |||
const VaultClientCache = require('../../../lib/clients/VaultClientCache'); | |||
const QueueEntry = require('../../../lib/models/QueueEntry'); | |||
const QueueEntry = require('../../utils/QueueEntry'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good reason to move this file?
extensions/utils/RaftLogEntry.js
Outdated
* | ||
* @param {object} objectMd - object info to format entry | ||
* @param {string} bucketPrefix - prefix for bucketname to avoid name clash | ||
* @return {object} JSON.stringified entry value to be sent to kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is a little bit off (or confusing), what is returned is a regular JS object that contains the stringified JSON kafka message as its value
attribute.
PR has been updated. Reviewers, please be cautious. |
|
ConflictThere is a conflict between your branch Please resolve the conflict on the feature branch (
|
0f9b283
to
ebf4831
Compare
PR has been updated. Reviewers, please be cautious. |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
PR has been updated. Reviewers, please be cautious. |
|
db9f42b
to
cf775c3
Compare
PR has been updated. Reviewers, please be cautious. |
PR has been updated. Reviewers, please be cautious. |
|
3767165
to
1b29fd8
Compare
PR has been updated. Reviewers, please be cautious. |
1b29fd8
to
5c3dae7
Compare
PR has been updated. Reviewers, please be cautious. |
1 similar comment
PR has been updated. Reviewers, please be cautious. |
|
4f104e6
to
2676ffd
Compare
PR has been updated. Reviewers, please be cautious. |
2676ffd
to
da9f219
Compare
PR has been updated. Reviewers, please be cautious. |
da9f219
to
8cdb0f7
Compare
*/ | ||
start() { | ||
this.logger.info('starting mongo queue processor'); | ||
this._mongoClient.setup(err => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think with kube environment, and use of liveness probe, this is ok since the pod would just restart, but this could fail to start in other environments since we aren't waiting on each asynchronous setup of internal components
dump: config.log.dumpLevel }); | ||
|
||
mongoQueueProcessor.start(); | ||
healthServer.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is missing setting onReadyCheck
callback
groupId: `${this.mongoProcessorConfig.groupId}`, | ||
// Must always have concurrency of 1 so writes are in order | ||
kafka: { hosts: '127.0.0.1:9092' }, | ||
concurrency: 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm rather confident we can get rid of this limit for ingestion (likely cut&paste left-over, will remove w/ @rahulreddy 's benediction)
topic: this.mongoProcessorConfig.topic, | ||
groupId: `${this.mongoProcessorConfig.groupId}`, | ||
// Must always have concurrency of 1 so writes are in order | ||
kafka: { hosts: '127.0.0.1:9092' }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to be configured via the global config object. Will fix
0952327
to
69343e4
Compare
Co-authored-by: Lauren Spiegel <lhspiegel@gmail.com>
69343e4
to
2c5b0a6
Compare
Queue out of orderThe changeset has received all authorizations to enter the merge queue, Please contact a member of release engineering. |
Reopening #261 with updated branch name format for Bert-e