Skip to content

Commit

Permalink
fix(#8566): adds sentinel transition retry cache (#8563)
Browse files Browse the repository at this point in the history
Adds a cache of 1000 entries (lifo) that tracks how many times Sentinel tried to run transition over a specific change (id + rev). Once the retry count is over 5, the change is skipped and the sentinel queue goes past it.

#8566
  • Loading branch information
dianabarsan authored and Benmuiruri committed Oct 26, 2023
1 parent ab5e252 commit 712c593
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 2 deletions.
40 changes: 40 additions & 0 deletions sentinel/src/lib/change-retry-history.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const MAX_HISTORY = 1000;
const MAX_RETRIES = 5;
const historyKeys = [];
const history = {};

const getKey = (change) => `${change.id}${change.changes?.[0]?.rev}`;

const add = (change) => {
if (!change) {
return;
}
const key = getKey(change);

if (history[key]) {
history[key]++;
} else {
historyKeys.push(key);
history[key] = 1;
}

if (historyKeys.length > MAX_HISTORY) {
const deletedKey = historyKeys.shift();
delete history[deletedKey];
}
};

const shouldProcess = (change) => {
if (!change) {
return false;
}

const key = getKey(change);

return !history[key] || history[key] <= MAX_RETRIES;
};

module.exports = {
add,
shouldProcess,
};
9 changes: 7 additions & 2 deletions sentinel/src/lib/feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const async = require('async');
const logger = require('./logger');
const db = require('../db');
const metadata = require('./metadata');
const changeRetryHistory = require('./change-retry-history');
const tombstoneUtils = require('@medic/tombstone-utils');
const transitionsLib = require('../config').getTransitionsLib();

Expand Down Expand Up @@ -43,8 +44,11 @@ const registerFeed = (seq) => {
request = db.medic
.changes({ live: true, since: seq })
.on('change', change => {
if (!change.id.match(IDS_TO_IGNORE) &&
!tombstoneUtils.isTombstoneId(change.id)) {
if (
!change.id.match(IDS_TO_IGNORE) &&
!tombstoneUtils.isTombstoneId(change.id) &&
changeRetryHistory.shouldProcess(change)
) {
enqueue(change);

const queueSize = changeQueue.length();
Expand Down Expand Up @@ -79,6 +83,7 @@ const changeQueue = async.queue((change, callback) => {

transitionsLib.processChange(change, err => {
if (err) {
changeRetryHistory.add(change);
return callback(err);
}

Expand Down
126 changes: 126 additions & 0 deletions sentinel/tests/unit/lib/change-retry-history.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const sinon = require('sinon');
const { expect } = require('chai');
const rewire = require('rewire');

let changeRetryHistory;

describe('change-retry-history', () => {
beforeEach(() => {
changeRetryHistory = rewire('../../../src/lib/change-retry-history');
});

afterEach(() => {
sinon.restore();
});

describe('add', () => {
it('should not throw errors on bad changes', () => {
changeRetryHistory.add();
changeRetryHistory.add(false);
changeRetryHistory.add('string');
changeRetryHistory.add(23);
changeRetryHistory.add({});
changeRetryHistory.add({ id: 'test', changes: false });
expect(changeRetryHistory.__get__('historyKeys').length).to.equal(2);
});

it('should add key to history', () => {
changeRetryHistory.add({ id: 'theid', changes: [{ rev: '22-rev' }] });
expect(changeRetryHistory.__get__('historyKeys')).to.deep.equal(['theid22-rev']);
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'theid22-rev': 1 });

changeRetryHistory.add({ id: 'theid', changes: [{ rev: '23-rev' }] });
expect(changeRetryHistory.__get__('historyKeys')).to.deep.equal(['theid22-rev', 'theid23-rev']);
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'theid22-rev': 1, 'theid23-rev': 1 });
});

it('should increase number when re-adding', () => {
changeRetryHistory.add({ id: 'phil', changes: [{ rev: '11-77' }] });
expect(changeRetryHistory.__get__('historyKeys')).to.deep.equal(['phil11-77']);
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'phil11-77': 1 });


changeRetryHistory.add({ id: 'phil', changes: [{ rev: '11-77' }] });
expect(changeRetryHistory.__get__('historyKeys')).to.deep.equal(['phil11-77']);
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'phil11-77': 2 });

changeRetryHistory.add({ id: 'phil', changes: [{ rev: '11-77' }] });
expect(changeRetryHistory.__get__('historyKeys')).to.deep.equal(['phil11-77']);
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'phil11-77': 3 });

changeRetryHistory.add({ id: 'phil', changes: [{ rev: '11-77' }] });
expect(changeRetryHistory.__get__('history')).to.deep.equal({ 'phil11-77': 4 });
});

it('should push old changes out of queue', () => {
Array
.from({ length: 1000 })
.forEach((_, idx) => changeRetryHistory.add({ id: `${idx + 1}`, changes: [{ rev: `${idx + 1}` }] }));

expect(changeRetryHistory.__get__('historyKeys').length).to.equal(1000);

changeRetryHistory.add({ id: 'george', changes: [{ rev: '22-77' }] });
expect(changeRetryHistory.__get__('historyKeys').length).to.equal(1000);
expect(changeRetryHistory.__get__('historyKeys')[0]).to.equal('22');
expect(changeRetryHistory.__get__('historyKeys')[1]).to.equal('33');
expect(changeRetryHistory.__get__('historyKeys')[999]).to.equal('george22-77');

changeRetryHistory.add({ id: 'michael', changes: [{ rev: '33-77' }] });
expect(changeRetryHistory.__get__('historyKeys').length).to.equal(1000);
expect(changeRetryHistory.__get__('historyKeys')[0]).to.equal('33');
expect(changeRetryHistory.__get__('historyKeys')[1]).to.equal('44');
expect(changeRetryHistory.__get__('historyKeys')[998]).to.equal('george22-77');
expect(changeRetryHistory.__get__('historyKeys')[999]).to.equal('michael33-77');
});
});

describe('shouldProcess', () => {
it('should not throw errors on bad changes', () => {
expect(changeRetryHistory.shouldProcess()).to.equal(false);
changeRetryHistory.shouldProcess(false);
changeRetryHistory.shouldProcess('string');
changeRetryHistory.shouldProcess(23);
changeRetryHistory.shouldProcess({});
changeRetryHistory.shouldProcess({ id: 'test', changes: false });
});

it('should return true when item is not present', () => {
const change = { id: 'a', changes: [{ rev: '33-77' }] };
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add({ id: 'george', changes: [{ rev: '22-77' }] });
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add({ id: 'michael', changes: [{ rev: '33-77' }] });
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
});

it('should return true when item is present but does not exceed limit', () => {
const change = { id: 'a', changes: [{ rev: '33-77' }] };
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(true);
changeRetryHistory.add(change);
expect(changeRetryHistory.shouldProcess(change)).to.equal(false);
});

it('should return true when item is present and new rev does not exceed limit', () => {
const change = { id: 'a', changes: [{ rev: '33-77' }] };
changeRetryHistory.add(change);
changeRetryHistory.add(change);
changeRetryHistory.add(change);
changeRetryHistory.add(change);
changeRetryHistory.add(change);
changeRetryHistory.add(change);

const newRevChange = { id: 'a', changes: [{ rev: '34-77' }] };
expect(changeRetryHistory.shouldProcess(newRevChange)).to.equal(true);
expect(changeRetryHistory.shouldProcess(change)).to.equal(false);
});
});
});
23 changes: 23 additions & 0 deletions sentinel/tests/unit/lib/feed.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const feed = require('../../../src/lib/feed');
const metadata = require('../../../src/lib/metadata');
const logger = require('../../../src/lib/logger');
const tombstoneUtils = require('@medic/tombstone-utils');
const changeRetryHistory = require('../../../src/lib/change-retry-history');

describe('feed', () => {

Expand Down Expand Up @@ -183,6 +184,28 @@ describe('feed', () => {
});
});

it('should skip docs that should not be retried', () => {
const doc1 = { id: 'some-uuid' };
const doc2 = { id: 'other-uuid' };
sinon.stub(metadata, 'getTransitionSeq').resolves('123');
sinon.stub(changeRetryHistory, 'shouldProcess')
.withArgs(doc1).returns(false)
.withArgs(doc2).returns(true);

const push = sinon.stub(feed._changeQueue, 'push');
return feed
.listen()
.then(() => {
const callbackFn = handler.on.args[0][1];
callbackFn(doc1);
callbackFn(doc2);
})
.then(() => {
chai.expect(push.callCount).to.equal(1);
chai.expect(push.args[0][0]).to.deep.equal(doc2);
});
});

it('stops listening when the number of changes in the queue is above the limit', () => {
const change = { id: 'some-uuid' };
sinon.stub(metadata, 'getTransitionSeq').resolves('123');
Expand Down

0 comments on commit 712c593

Please sign in to comment.