Skip to content

Commit

Permalink
Increment unprocessed attempts when fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny-signal committed Apr 28, 2022
1 parent 6d576ed commit d6b58d2
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 41 deletions.
4 changes: 2 additions & 2 deletions ts/SignalProtocolStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1859,9 +1859,9 @@ export class SignalProtocolStore extends EventsMixin {
});
}

getAllUnprocessed(): Promise<Array<UnprocessedType>> {
getAllUnprocessedAndIncrementAttempts(): Promise<Array<UnprocessedType>> {
return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => {
return window.Signal.Data.getAllUnprocessed();
return window.Signal.Data.getAllUnprocessedAndIncrementAttempts();
});
}

Expand Down
6 changes: 3 additions & 3 deletions ts/sql/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ const dataInterface: ClientInterface = {
migrateConversationMessages,

getUnprocessedCount,
getAllUnprocessed,
getAllUnprocessedAndIncrementAttempts,
getUnprocessedById,
updateUnprocessedWithData,
updateUnprocessedsWithData,
Expand Down Expand Up @@ -1443,8 +1443,8 @@ async function getUnprocessedCount() {
return channels.getUnprocessedCount();
}

async function getAllUnprocessed() {
return channels.getAllUnprocessed();
async function getAllUnprocessedAndIncrementAttempts() {
return channels.getAllUnprocessedAndIncrementAttempts();
}

async function getUnprocessedById(id: string) {
Expand Down
2 changes: 1 addition & 1 deletion ts/sql/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ export type DataInterface = {
) => Promise<void>;

getUnprocessedCount: () => Promise<number>;
getAllUnprocessed: () => Promise<Array<UnprocessedType>>;
getAllUnprocessedAndIncrementAttempts: () => Promise<Array<UnprocessedType>>;
updateUnprocessedWithData: (
id: string,
data: UnprocessedUpdateType
Expand Down
66 changes: 46 additions & 20 deletions ts/sql/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ const dataInterface: ServerInterface = {
migrateConversationMessages,

getUnprocessedCount,
getAllUnprocessed,
getAllUnprocessedAndIncrementAttempts,
updateUnprocessedWithData,
updateUnprocessedsWithData,
getUnprocessedById,
Expand Down Expand Up @@ -3181,32 +3181,58 @@ async function getUnprocessedCount(): Promise<number> {
return getCountFromTable(getInstance(), 'unprocessed');
}

async function getAllUnprocessed(): Promise<Array<UnprocessedType>> {
async function getAllUnprocessedAndIncrementAttempts(): Promise<
Array<UnprocessedType>
> {
const db = getInstance();

const { changes: deletedCount } = db
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
.run({
monthAgo: Date.now() - durations.MONTH,
});
return db.transaction(() => {
const { changes: deletedStaleCount } = db
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
.run({
monthAgo: Date.now() - durations.MONTH,
});

if (deletedCount !== 0) {
logger.warn(
`getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes`
);
}
if (deletedStaleCount !== 0) {
logger.warn(
'getAllUnprocessedAndIncrementAttempts: ' +
`deleting ${deletedStaleCount} old unprocessed envelopes`
);
}

const rows = db
.prepare<EmptyQuery>(
db.prepare<EmptyQuery>(
`
SELECT *
FROM unprocessed
ORDER BY timestamp ASC;
UPDATE unprocessed
SET attempts = attempts + 1
`
)
.all();
).run();

return rows;
const { changes: deletedInvalidCount } = db
.prepare<Query>(
`
DELETE FROM unprocessed
WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS
`
)
.run({ MAX_UNPROCESSED_ATTEMPTS });

if (deletedInvalidCount !== 0) {
logger.warn(
'getAllUnprocessedAndIncrementAttempts: ' +
`deleting ${deletedInvalidCount} invalid unprocessed envelopes`
);
}

return db
.prepare<EmptyQuery>(
`
SELECT *
FROM unprocessed
ORDER BY timestamp ASC;
`
)
.all();
})();
}

function removeUnprocessedsSync(ids: Array<string>): void {
Expand Down
28 changes: 22 additions & 6 deletions ts/test-electron/SignalProtocolStore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,8 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);

const allUnprocessed = await store.getAllUnprocessed();
const allUnprocessed =
await store.getAllUnprocessedAndIncrementAttempts();
assert.deepEqual(
allUnprocessed.map(({ envelope }) => envelope),
['second']
Expand Down Expand Up @@ -1551,7 +1552,7 @@ describe('SignalProtocolStore', () => {

assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
assert.deepEqual(await store.getAllUnprocessed(), []);
assert.deepEqual(await store.getAllUnprocessedAndIncrementAttempts(), []);
});

it('can be re-entered', async () => {
Expand Down Expand Up @@ -1647,7 +1648,7 @@ describe('SignalProtocolStore', () => {

beforeEach(async () => {
await store.removeAllUnprocessed();
const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});

Expand Down Expand Up @@ -1687,7 +1688,7 @@ describe('SignalProtocolStore', () => {
}),
]);

const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 3);

// they are in the proper order because the collection comparator is 'timestamp'
Expand All @@ -1708,10 +1709,11 @@ describe('SignalProtocolStore', () => {
});
await store.updateUnprocessedWithData(id, { decrypted: 'updated' });

const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].decrypted, 'updated');
assert.strictEqual(items[0].timestamp, NOW + 1);
assert.strictEqual(items[0].attempts, 1);
});

it('removeUnprocessed successfully deletes item', async () => {
Expand All @@ -1726,7 +1728,21 @@ describe('SignalProtocolStore', () => {
});
await store.removeUnprocessed(id);

const items = await store.getAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});

it('getAllUnprocessedAndIncrementAttempts deletes items', async () => {
await store.addUnprocessed({
id: '1-one',
envelope: 'first',
timestamp: NOW + 1,
receivedAtCounter: 0,
version: 2,
attempts: 3,
});

const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});
});
Expand Down
12 changes: 3 additions & 9 deletions ts/textsecure/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -802,17 +802,11 @@ export default class MessageReceiver
return [];
}

const items = await this.storage.protocol.getAllUnprocessed();
const items =
await this.storage.protocol.getAllUnprocessedAndIncrementAttempts();
log.info('getAllFromCache loaded', items.length, 'saved envelopes');

return items.map(item => {
const { attempts = 0 } = item;

return {
...item,
attempts: attempts + 1,
};
});
return items;
}

private async decryptAndCacheBatch(
Expand Down

0 comments on commit d6b58d2

Please sign in to comment.