Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ft: ZENKO-147 Use Redis keys instead of hash #286

Merged
merged 1 commit into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"replicationStatusTopic": "backbeat-replication-status",
"replicationFailedTopic": "backbeat-replication-failed",
"monitorReplicationFailures": true,
"monitorReplicationFailureExpiryTimeS": 86400,
"queueProcessor": {
"groupId": "backbeat-replication-group",
"retryTimeoutS": 300,
Expand Down
4 changes: 4 additions & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const { hostPortJoi, bootstrapListJoi, adminCredsJoi } =
const transportJoi = joi.alternatives().try('http', 'https')
.default('http');

const CRR_FAILURE_EXPIRY = 24 * 60 * 60; // Expire Redis keys after 24 hours.

const joiSchema = {
source: {
transport: transportJoi,
Expand Down Expand Up @@ -52,6 +54,8 @@ const joiSchema = {
replicationStatusTopic: joi.string().required(),
monitorReplicationFailures: joi.boolean().default(true),
replicationFailedTopic: joi.string().required(),
monitorReplicationFailureExpiryTimeS:
joi.number().default(CRR_FAILURE_EXPIRY),
queueProcessor: {
groupId: joi.string().required(),
retryTimeoutS: joi.number().default(300),
Expand Down
18 changes: 10 additions & 8 deletions extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const redisClient = require('../../replication/utils/getRedisClient')();
const FailedCRRProducer = require('./FailedCRRProducer');
const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const redisKeys = require('../constants').redisKeys;
const config = require('../../../conf/Config');

// BackbeatConsumer constant defaults
Expand All @@ -18,6 +17,7 @@ class FailedCRRConsumer {
* Create the retry consumer.
*/
constructor() {
this._repConfig = config.extensions.replication;
this._kafkaConfig = config.kafka;
this._topic = config.extensions.replication.replicationFailedTopic;
this.logger = new Logger('Backbeat:FailedCRRConsumer');
Expand Down Expand Up @@ -77,7 +77,7 @@ class FailedCRRConsumer {
log.end();
return cb();
}
return this._setRedisHash(data, kafkaEntry, log, cb);
return this._setRedisKey(data, kafkaEntry, log, cb);
}

/**
Expand All @@ -90,11 +90,11 @@ class FailedCRRConsumer {
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setRedisHash(data, kafkaEntry, log, cb) {
_setRedisKey(data, kafkaEntry, log, cb) {
this._backbeatTask.retry({
actionDesc: 'set redis key',
logFields: {},
actionFunc: done => this._setRedisHashOnce(data, log, done),
actionFunc: done => this._setRedisKeyOnce(data, log, done),
shouldRetryFunc: err => err.retryable,
log,
}, err => {
Expand All @@ -110,14 +110,16 @@ class FailedCRRConsumer {

/**
* Attempt to set the Redis hash.
* @param {Object} data - The field and value for the Redis hash
* @param {Object} data - The key and value for the Redis key
* @param {Werelogs} log - The werelogs logger
* @param {Function} cb - The callback function
* @return {undefined}
*/
_setRedisHashOnce(data, log, cb) {
const cmds = ['hmset', redisKeys.failedCRR, [data.field, data.value]];
return redisClient.batch([cmds], (err, res) => {
_setRedisKeyOnce(data, log, cb) {
const { key, value } = data;
const expiry = this._repConfig.monitorReplicationFailureExpiryTimeS;
const cmd = ['set', key, value, 'EX', expiry];
return redisClient.batch([cmd], (err, res) => {
if (err) {
return cb({ retryable: true });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const ReplicationTaskScheduler = require('../utils/ReplicationTaskScheduler');
const UpdateReplicationStatus = require('../tasks/UpdateReplicationStatus');
const QueueEntry = require('../../../lib/models/QueueEntry');
const ObjectQueueEntry = require('../utils/ObjectQueueEntry');
const { redisKeys } = require('../constants');

/**
* @class ReplicationStatusProcessor
Expand Down Expand Up @@ -145,7 +146,8 @@ class ReplicationStatusProcessor {
const versionId = queueEntry.getEncodedVersionId();
const { site } = backend;
const message = {
field: `${bucket}:${key}:${versionId}:${site}`,
key: `${redisKeys.failedCRR}:` +
`${bucket}:${key}:${versionId}:${site}`,
value: Buffer.from(kafkaEntry.value).toString(),
};
return this._FailedCRRProducer
Expand Down
157 changes: 90 additions & 67 deletions lib/api/BackbeatAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const QueueEntry = require('../../lib/models/QueueEntry');
const Healthcheck = require('./Healthcheck');
const routes = require('./routes');
const { redisKeys } = require('../../extensions/replication/constants');
const getFailedCRRKey = require('../util/getFailedCRRKey');
const monitoringClient = require('../clients/monitoringHandler').client;

// StatsClient constant defaults
Expand Down Expand Up @@ -216,57 +217,85 @@ class BackbeatAPI {
/**
* Builds the failed CRR response.
* @param {String} cursor - The Redis HSCAN cursor
* @param {Array} hashes - The collection of Redis hashes for the iteration
* @return {Object} - The response object
* @param {Array} keys - The collection of Redis keys for the iteration
* @param {Function} cb - The callback function
* @return {undefined}
*/
_getFailedCRRResponse(cursor, hashes) {
_getFailedCRRResponse(cursor, keys, cb) {
const response = {
IsTruncated: Number.parseInt(cursor, 10) !== 0,
Versions: [],
};
if (response.IsTruncated) {
response.NextMarker = Number.parseInt(cursor, 10);
}
for (let i = 0; i < hashes.length; i += 2) {
const [bucket, key, versionId, site] = hashes[i].split(':');
const entry = hashes[i + 1];
const value = JSON.parse(JSON.parse(entry).value);
response.Versions.push({
Bucket: bucket,
Key: key,
VersionId: versionId,
StorageClass: site,
Size: value['content-length'],
LastModified: value['last-modified'],
});
}
return response;
const cmds = keys.map(k => ['get', k]);
return this._redisClient.batch(cmds, (err, res) => {
if (err) {
return cb(err);
}
for (let i = 0; i < res.length; i++) {
const [cmdErr, value] = res[i];
if (cmdErr) {
return cb(cmdErr);
}
const queueEntry = QueueEntry.createFromKafkaEntry({ value });
response.Versions.push({
Bucket: queueEntry.getBucket(),
Key: queueEntry.getObjectKey(),
VersionId: queueEntry.getEncodedVersionId(),
StorageClass: queueEntry.getSite(),
Size: queueEntry.getContentLength(),
LastModified: queueEntry.getLastModified(),
});
}
return cb(null, response);
});
}

/**
* Find all failed CRR operations that match the bucket, key, and versionID.
* @param {Object} details - The route details
* @param {Function} cb - The callback to call
* Recursively scan all existing keys with a count of 1000. Call callback if
* the response is greater or equal to 1000 keys, or we have scanned all
* keys (i.e. when the cursor is 0).
* @param {String} pattern - The key pattern to match
* @param {Number} marker - The cursor to start scanning from
* @param {Array} allKeys - The collection of all matching keys found
* @param {Function} cb - The callback function
* @return {undefined}
*/
getFailedCRR(details, cb) {
const { bucket, key, versionId } = details;
const pattern = `${bucket}:${key}:${versionId}:*`;
const cmds =
['hscan', redisKeys.failedCRR, 0, 'MATCH', pattern, 'COUNT', 1000];
this._redisClient.batch([cmds], (err, res) => {
_scanAllKeys(pattern, marker, allKeys, cb) {
const cmd = ['scan', marker, 'MATCH', pattern, 'COUNT', 1000];
this._redisClient.batch([cmd], (err, res) => {
if (err) {
return cb(err);
}
const [cmdErr, collection] = res[0];
if (cmdErr) {
return cb(cmdErr);
}
const [cursor, hashes] = collection;
return cb(null, this._getFailedCRRResponse(cursor, hashes));
const [cursor, keys] = collection;
allKeys.push(...keys);
if (allKeys.length >= 1000 || Number.parseInt(cursor, 10) === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allKeys.length >= 1000
If we get 1000 keys initially, won't this return before getting next set of keys? Or was this intentional?

Copy link
Author

@bennettbuchanan bennettbuchanan May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in effect it allows for a paginated response. I wanted to limit the API response listing to ~1000 keys to model closer to the default for version listings in S3. We cannot guarantee the exact number because Redis does not make a guarantee for the number of keys returned during the scan. So if it's 1000 or more, the API will include a NextMarker value for subsequent listings.

return cb(null, cursor, allKeys);
}
return this._scanAllKeys(pattern, cursor, allKeys, cb);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with redis API, but if asked for 1000 keys, may it return you less than this even if there are 1000+ keys to return? In such case it looks correct to call _scanAllKeys again, otherwise it seems we could just return the results in the callback in all cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a guarantee on the number of keys being returned by Redis during the SCAN operation. (I've added a point about this in the Operational Considerations section of the doc for this feature.) We do have a guarantee that the scan has completed when the returned cursor is 0, so in such a case we can return whatever results are there even if less than 1000.

});
}

/**
* Find all failed CRR operations that match the bucket, key, and versionID.
* @param {Object} details - The route details
* @param {Function} cb - The callback to call
* @return {undefined}
*/
getFailedCRR(details, cb) {
const { bucket, key, versionId } = details;
const { failedCRR } = redisKeys;
const pattern = `${failedCRR}:${bucket}:${key}:${versionId}:*`;
return this._scanAllKeys(pattern, 0, [], (err, cursor, keys) =>
this._getFailedCRRResponse(cursor, keys, cb));
}

/**
* Get all CRR operations that have failed.
* @param {Object} details - The route details
Expand All @@ -275,26 +304,17 @@ class BackbeatAPI {
*/
getAllFailedCRR(details, cb) {
const marker = Number.parseInt(details.marker, 10) || 0;
const cmds = ['hscan', redisKeys.failedCRR, marker, 'COUNT', 1000];
this._redisClient.batch([cmds], (err, res) => {
if (err) {
return cb(err);
}
const [cmdErr, collection] = res[0];
if (cmdErr) {
return cb(cmdErr);
}
const [cursor, hashes] = collection;
return cb(null, this._getFailedCRRResponse(cursor, hashes));
});
const pattern = `${redisKeys.failedCRR}:*`;
return this._scanAllKeys(pattern, marker, [], (err, cursor, keys) =>
this._getFailedCRRResponse(cursor, keys, cb));
}

/**
* For the given queue enry's site, send an entry with PENDING status to the
* replication status topic, then send an entry to the replication topic so
* that the queue processor re-attempts replication.
* For the given queue entry's site, send an entry with PENDING status to
* the replication status topic, then send an entry to the replication topic
* so that the queue processor re-attempts replication.
* @param {QueueEntry} queueEntry - The queue entry constructed from the
* failed kafka entry that was stored as a Redis hash value.
* failed kafka entry that was stored as a Redis key value.
* @param {Function} cb - The callback.
* @return {undefined}
*/
Expand All @@ -312,29 +332,27 @@ class BackbeatAPI {
}

/**
* Delete the failed CRR Redis hash field.
* @param {String} field - The field in the hash to delete
* Delete the failed CRR Redis key.
* @param {String} key - The key to delete
* @param {Function} cb - The callback function
* @return {undefined}
*/
_deleteFailedCRRField(field, cb) {
const cmds = ['hdel', redisKeys.failedCRR, field];
return this._redisClient.batch([cmds], (err, res) => {
_deleteFailedCRRField(key, cb) {
const cmd = ['del', key];
return this._redisClient.batch([cmd], (err, res) => {
if (err) {
this._logger.error('error deleting redis hash field', {
this._logger.error('error deleting redis key', {
method: 'BackbeatAPI._deleteFailedCRRField',
key: redisKeys.failedCRR,
field,
key,
error: err,
});
return cb(err);
}
const [cmdErr] = res[0];
if (cmdErr) {
this._logger.error('error deleting redis hash field', {
this._logger.error('error deleting redis key', {
method: 'BackbeatAPI._deleteFailedCRRField',
key: redisKeys.failedCRR,
field,
key,
error: cmdErr,
});
return cb(cmdErr);
Expand Down Expand Up @@ -380,11 +398,12 @@ class BackbeatAPI {
LastModified: queueEntry.getLastModified(),
ReplicationStatus: 'PENDING',
});
const field = `${Bucket}:${Key}:${VersionId}:${StorageClass}`;
return this._deleteFailedCRRField(field, err => {
const key =
getFailedCRRKey(Bucket, Key, VersionId, StorageClass);
return this._deleteFailedCRRField(key, err => {
if (err) {
this._logger.error('could not delete redis hash key ' +
'after pushing to kafka topics', {
this._logger.error('could not delete redis key after ' +
'pushing to kafka topics', {
method: 'BackbeatAPI._processFailedKafkaEntries',
error: err,
});
Expand All @@ -408,20 +427,24 @@ class BackbeatAPI {
if (error) {
return cb(error);
}
const fields = reqBody.map(o => {
const cmds = reqBody.map(o => {
const { Bucket, Key, VersionId, StorageClass } = o;
return `${Bucket}:${Key}:${VersionId}:${StorageClass}`;
const key = getFailedCRRKey(Bucket, Key, VersionId, StorageClass);
return ['get', key];
});
const cmds = ['hmget', redisKeys.failedCRR, ...fields];
return this._redisClient.batch([cmds], (err, res) => {
return this._redisClient.batch(cmds, (err, res) => {
if (err) {
return cb(err);
}
const [cmdErr, results] = res[0];
if (cmdErr) {
return cb(cmdErr);
const entries = [];
for (let i = 0; i < res.length; i++) {
const [cmdErr, entry] = res[i];
if (cmdErr) {
return cb(cmdErr);
}
entries.push(entry);
}
return this._processFailedKafkaEntries(results, cb);
return this._processFailedKafkaEntries(entries, cb);
});
}

Expand Down
16 changes: 16 additions & 0 deletions lib/util/getFailedCRRKey.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const { redisKeys } = require('../../extensions/replication/constants');

/**
* Returns the schema used for failed CRR entry Redis keys.
* @param {String} bucket - The name of the bucket
* @param {String} key - The name of the key
* @param {String} versionId - The encoded version ID
* @param {String} storageClass - The storage class of the object
* @return {String} - The Redis key used for the failed CRR entry
*/
function getFailedCRRKey(bucket, key, versionId, storageClass) {
const { failedCRR } = redisKeys;
return `${failedCRR}:${bucket}:${key}:${versionId}:${storageClass}`;
}

module.exports = getFailedCRRKey;
1 change: 1 addition & 0 deletions tests/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"topic": "backbeat-test-replication",
"replicationStatusTopic": "backbeat-test-replication-status",
"monitorReplicationFailures": true,
"monitorReplicationFailureExpiryTimeS": 86400,
"groupId": "backbeat-test-replication-group",
"destination": {
"bootstrapList": [
Expand Down
Loading