Skip to content

Commit

Permalink
ft: S3C-1398 Use retry CRR topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Bennett Buchanan committed May 26, 2018
1 parent 92eb9ac commit 7dfdd46
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 52 deletions.
1 change: 1 addition & 0 deletions conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
},
"topic": "backbeat-replication",
"replicationStatusTopic": "backbeat-replication-status",
"replicationFailedTopic": "backbeat-replication-failed",
"queueProcessor": {
"groupId": "backbeat-replication-group",
"retryTimeoutS": 300,
Expand Down
2 changes: 1 addition & 1 deletion extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const joiSchema = {
},
topic: joi.string().required(),
replicationStatusTopic: joi.string().required(),
monitorReplicationFailures: joi.boolean().default(true),
replicationFailedTopic: joi.string().required(),
queueProcessor: {
groupId: joi.string().required(),
retryTimeoutS: joi.number().default(300),
Expand Down
130 changes: 130 additions & 0 deletions extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
'use strict'; // eslint-disable-line strict

const Logger = require('werelogs').Logger;
const redisClient = require('../../replication/utils/getRedisClient')();

const FailedCRRProducer = require('./FailedCRRProducer');
const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const redisKeys = require('../constants').redisKeys;
const config = require('../../../conf/Config');

// BackbeatConsumer constant defaults
const CONSUMER_FETCH_MAX_BYTES = 5000020;
const CONCURRENCY = 10;

class FailedCRRConsumer {
/**
* Create the retry consumer.
*/
constructor() {
this._kafkaConfig = config.kafka;
this._topic = config.extensions.replication.replicationFailedTopic;
this.logger = new Logger('Backbeat:FailedCRRConsumer');
this._failedCRRProducer = new FailedCRRProducer(this.kafkaConfig);
this._backbeatTask = new BackbeatTask();
}

/**
* Start the retry consumer by subscribing to the retry kafka topic. Setup
* the failed CRR producer for pushing any failed redis operations back to
* the queue.
* @param {Function} cb - The callback function
* @return {undefined}
*/
start(cb) {
const consumer = new BackbeatConsumer({
kafka: { hosts: this._kafkaConfig.hosts },
topic: this._topic,
groupId: 'backbeat-retry-group',
concurrency: CONCURRENCY,
queueProcessor: this.processKafkaEntry.bind(this),
fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES,
});
consumer.on('error', () => {});
consumer.on('ready', () => {
consumer.subscribe();
this.logger.info('retry consumer is ready to consume entries');
});
return this._failedCRRProducer.setupProducer(err => {
if (err) {
this.logger.error('could not setup producer', {
method: 'FailedCRRConsumer.processKafkaEntry',
error: err,
});
return cb(err);
}
return cb();
});
}

/**
* Process an entry from the retry topic, and set the data in a Redis hash.
* @param {Object} kafkaEntry - The entry from the retry topic
* @param {function} cb - The callback function
* @return {undefined}
*/
processKafkaEntry(kafkaEntry, cb) {
const log = this.logger.newRequestLogger();
let data;
try {
data = JSON.parse(kafkaEntry.value);
} catch (err) {
log.error('error processing retry entry', {
method: 'FailedCRRConsumer.processKafkaEntry',
error: err,
});
log.end();
return cb();
}
return this._setRedisHash(data, kafkaEntry, log, cb);
}

/**
* Attempt to set the Redis hash, using an exponential backoff should the
* key set fail. If the backoff time is exceeded, push the entry back into
* the retry entry topic for a later reattempt.
* @param {Object} data - The field and value for the Redis hash
* @param {Object} kafkaEntry - The entry from the retry topic
* @param {Werelogs} log - The werelogs logger
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setRedisHash(data, kafkaEntry, log, cb) {
this._backbeatTask.retry({
actionDesc: 'set redis key',
logFields: {},
actionFunc: done => this._setRedisHashOnce(data, log, done),
shouldRetryFunc: err => err.retryable,
log,
}, err => {
if (err && err.retryable === true) {
log.info('publishing entry back into the kafka queue');
const entry = Buffer.from(kafkaEntry.value).toString();
return this._failedCRRProducer.publishFailedCRREntry(entry, cb);
}
log.info('successfully set redis key');
return cb();
});
}

/**
* Attempt to set the Redis hash.
* @param {Object} data - The field and value for the Redis hash
* @param {Werelogs} log - The werelogs logger
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setRedisHashOnce(data, log, cb) {
const cmds = ['hmset', redisKeys.failedCRR, [data.field, data.value]];
return redisClient.batch([cmds], (err, res) => {
if (err) {
return cb({ retryable: true });
}
const [cmdErr] = res[0];
return cb({ retryable: cmdErr !== null });
});
}
}

module.exports = FailedCRRConsumer;
61 changes: 61 additions & 0 deletions extensions/replication/failedCRR/FailedCRRProducer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict'; // eslint-disable-line strict

const { Logger } = require('werelogs');

const BackbeatProducer = require('../../../lib/BackbeatProducer');
const config = require('../../../conf/Config');

class FailedCRRProducer {
/**
* Create the retry producer.
*/
constructor() {
this._kafkaConfig = config.kafka;
this._topic = config.extensions.replication.replicationFailedTopic;
this._producer = null;
this._log = new Logger('Backbeat:FailedCRRProducer');
}

/**
* Set up the retry producer.
* @param {function} [cb] - Optional callback called when startup
* is complete
* @return {undefined}
*/
setupProducer(cb) {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
topic: this._topic,
});
producer.once('error', () => {});
producer.once('ready', () => {
producer.removeAllListeners('error');
producer.on('error', err =>
this._log.error('error from backbeat producer', {
error: err,
}));
this._producer = producer;
if (cb) {
return cb();
}
return undefined;
});
}

/**
* Publish the given message to the retry Kafka topic.
* @param {String} message - The message to publish
* @param {Function} cb - The callback function
* @return {undefined}
*/
publishFailedCRREntry(message, cb) {
this._producer.send([{ message }], err => {
if (err) {
this._log.trace('error publishing retry entry');
}
return cb();
});
}
}

module.exports = FailedCRRProducer;
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
'use strict'; // eslint-disable-line

const http = require('http');
const async = require('async');

const Logger = require('werelogs').Logger;
const errors = require('arsenal').errors;

const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const FailedCRRProducer =
require('../../../extensions/replication/failedCRR/FailedCRRProducer');
const VaultClientCache = require('../../../lib/clients/VaultClientCache');
const ReplicationTaskScheduler = require('../utils/ReplicationTaskScheduler');
const redisClient = require('../utils/getRedisClient')();
const UpdateReplicationStatus = require('../tasks/UpdateReplicationStatus');
const QueueEntry = require('../../../lib/models/QueueEntry');
const ObjectQueueEntry = require('../utils/ObjectQueueEntry');
const { redisKeys } = require('../constants');

/**
* @class ReplicationStatusProcessor
Expand Down Expand Up @@ -92,6 +93,7 @@ class ReplicationStatusProcessor {
* @return {undefined}
*/
start(options, cb) {
this._FailedCRRProducer = new FailedCRRProducer(this.kafkaConfig);
this._consumer = new BackbeatConsumer({
kafka: { hosts: this.kafkaConfig.hosts },
topic: this.repConfig.replicationStatusTopic,
Expand All @@ -106,9 +108,7 @@ class ReplicationStatusProcessor {
this.logger.info('replication status processor is ready to ' +
'consume replication status entries');
this._consumer.subscribe();
if (cb) {
cb();
}
this._FailedCRRProducer.setupProducer(cb);
});
}

Expand All @@ -126,38 +126,32 @@ class ReplicationStatusProcessor {
}

/**
* Set the Redis hash key for each failed backend.
* Push any failed entry to the "failed" topic.
* @param {QueueEntry} queueEntry - The queue entry with the failed status.
* @param {Object} kafkaEntry - The kafka entry with the failed status.
* @param {Function} cb [description]
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setFailedKeys(queueEntry, kafkaEntry, cb) {
_pushFailedEntry(queueEntry, kafkaEntry, cb) {
const { status, backends } = queueEntry.getReplicationInfo();
if (status !== 'FAILED') {
return process.nextTick(cb);
}
const bucket = queueEntry.getBucket();
const key = queueEntry.getObjectKey();
const versionId = queueEntry.getEncodedVersionId();
const fields = [];
backends.forEach(backend => {
const { status, site } = backend;
if (status === 'FAILED' && site === queueEntry.getSite()) {
const field = `${bucket}:${key}:${versionId}:${site}`;
const value = JSON.parse(kafkaEntry.value);
fields.push(field, JSON.stringify(value));
}
return undefined;
});
const cmds = ['hmset', redisKeys.failedCRR, ...fields];
return redisClient.batch([cmds], (err, res) => {
if (err) {
return cb(err);
}
const [cmdErr] = res[0];
return cb(cmdErr);
});
const backend = backends.find(b =>
b.status === 'FAILED' && b.site === queueEntry.getSite());
if (backend) {
const bucket = queueEntry.getBucket();
const key = queueEntry.getObjectKey();
const versionId = queueEntry.getEncodedVersionId();
const { site } = backend;
const message = {
field: `${bucket}:${key}:${versionId}:${site}`,
value: Buffer.from(kafkaEntry.value).toString(),
};
return this._FailedCRRProducer
.publishFailedCRREntry(JSON.stringify(message), cb);
}
return cb();
}

/**
Expand All @@ -182,20 +176,12 @@ class ReplicationStatusProcessor {
if (sourceEntry instanceof ObjectQueueEntry) {
task = new UpdateReplicationStatus(this);
}
if (task && this.repConfig.monitorReplicationFailures) {
return this._setFailedKeys(sourceEntry, kafkaEntry, err => {
if (err) {
this.logger.error('error setting redis hash key', {
error: err,
});
}
return this.taskScheduler.push({ task, entry: sourceEntry },
sourceEntry.getCanonicalKey(), done);
});
}
if (task) {
return this.taskScheduler.push({ task, entry: sourceEntry },
sourceEntry.getCanonicalKey(), done);
return async.parallel([
next => this._pushFailedEntry(sourceEntry, kafkaEntry, next),
next => this.taskScheduler.push({ task, entry: sourceEntry },
sourceEntry.getCanonicalKey(), next),
], done);
}
this.logger.warn('skipping unknown source entry',
{ entry: sourceEntry.getLogInfo() });
Expand Down
23 changes: 14 additions & 9 deletions lib/queuePopulator/QueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const RaftLogReader = require('./RaftLogReader');
const BucketFileLogReader = require('./BucketFileLogReader');
const MetricsProducer = require('../MetricsProducer');
const MetricsConsumer = require('../MetricsConsumer');
const FailedCRRConsumer =
require('../../extensions/replication/failedCRR/FailedCRRConsumer');
const MongoLogReader = require('./MongoLogReader');

class QueuePopulator {
Expand Down Expand Up @@ -69,15 +71,8 @@ class QueuePopulator {
open(cb) {
this._loadExtensions();
async.series([
next => this._setupMetricsClients(err => {
if (err) {
this.log.error('error setting up metrics client', {
method: 'QueuePopulator.open',
error: err,
});
}
return next(err);
}),
next => this._setupMetricsClients(next),
next => this._setupFailedCRRClients(next),
next => this._setupExtensions(err => {
if (err) {
this.log.error(
Expand Down Expand Up @@ -117,6 +112,16 @@ class QueuePopulator {
this._mProducer.setupProducer(cb);
}

/**
* Set up and start the consumer for retrying failed CRR operations.
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setupFailedCRRClients(cb) {
this._failedCRRConsumer = new FailedCRRConsumer(this.kafkaConfig);
return this._failedCRRConsumer.start(cb);
}

/**
* Close the queue populator
* @param {function} cb - callback function
Expand Down

0 comments on commit 7dfdd46

Please sign in to comment.