Skip to content

Commit

Permalink
Better handle large numbers of messages in cache on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
scottnonnenberg-signal committed Oct 15, 2018
1 parent 655d9d1 commit 76e72f8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
11 changes: 11 additions & 0 deletions app/sql.js
Expand Up @@ -29,6 +29,7 @@ module.exports = {
getNextExpiringMessage,
getMessagesByConversation,

getUnprocessedCount,
getAllUnprocessed,
saveUnprocessed,
getUnprocessedById,
Expand Down Expand Up @@ -723,6 +724,16 @@ async function getUnprocessedById(id) {
return jsonToObject(row.json);
}

async function getUnprocessedCount() {
const row = await db.get('SELECT count(*) from unprocessed;');

if (!row) {
throw new Error('getMessageCount: Unable to get count of unprocessed');
}

return row['count(*)'];
}

async function getAllUnprocessed() {
const rows = await db.all(
'SELECT json FROM unprocessed ORDER BY timestamp ASC;'
Expand Down
5 changes: 5 additions & 0 deletions js/modules/data.js
Expand Up @@ -56,6 +56,7 @@ module.exports = {
getNextExpiringMessage,
getMessagesByConversation,

getUnprocessedCount,
getAllUnprocessed,
getUnprocessedById,
saveUnprocessed,
Expand Down Expand Up @@ -356,6 +357,10 @@ async function getNextExpiringMessage({ MessageCollection }) {
return new MessageCollection(messages);
}

async function getUnprocessedCount() {
return channels.getUnprocessedCount();
}

async function getAllUnprocessed() {
return channels.getAllUnprocessed();
}
Expand Down
6 changes: 6 additions & 0 deletions js/signal_protocol_store.js
Expand Up @@ -939,6 +939,9 @@
},

// Not yet processed messages - for resiliency
getUnprocessedCount() {
return window.Signal.Data.getUnprocessedCount();
},
getAllUnprocessed() {
return window.Signal.Data.getAllUnprocessed();
},
Expand All @@ -959,6 +962,9 @@
removeUnprocessed(id) {
return window.Signal.Data.removeUnprocessed(id, { Unprocessed });
},
removeAllUnprocessed() {
return window.Signal.Data.removeAllUnprocessed();
},
async removeAllData() {
// First the in-memory caches:
window.storage.reset(); // items store
Expand Down
47 changes: 27 additions & 20 deletions libtextsecure/message_receiver.js
Expand Up @@ -441,38 +441,45 @@ MessageReceiver.prototype.extend({
envelope.sourceDevice
} ${envelope.timestamp.toNumber()}`;
},
getAllFromCache() {
async getAllFromCache() {
window.log.info('getAllFromCache');
return textsecure.storage.unprocessed.getAll().then(items => {
window.log.info(
'getAllFromCache loaded',
items.length,
'saved envelopes'
const count = await textsecure.storage.unprocessed.getCount();

if (count > 250) {
await textsecure.storage.unprocessed.removeAll();
window.log.warn(
`There were ${count} messages in cache. Deleted all instead of reprocessing`
);
return [];
}

const items = await textsecure.storage.unprocessed.getAll();
window.log.info('getAllFromCache loaded', items.length, 'saved envelopes');

return Promise.all(
_.map(items, async item => {
const attempts = 1 + (item.attempts || 0);

return Promise.all(
_.map(items, item => {
const attempts = 1 + (item.attempts || 0);
if (attempts >= 5) {
try {
if (attempts >= 3) {
window.log.warn(
'getAllFromCache final attempt for envelope',
item.id
);
return textsecure.storage.unprocessed.remove(item.id);
await textsecure.storage.unprocessed.remove(item.id);
} else {
await textsecure.storage.unprocessed.save({ ...item, attempts });
}
return textsecure.storage.unprocessed.save({ ...item, attempts });
})
).then(
() => items,
error => {
} catch (error) {
window.log.error(
'getAllFromCache error updating items after load:',
'getAllFromCache error updating item after load:',
error && error.stack ? error.stack : error
);
return items;
}
);
});

return item;
})
);
},
async addToCache(envelope, plaintext) {
const id = this.getEnvelopeId(envelope);
Expand Down
6 changes: 6 additions & 0 deletions libtextsecure/storage/unprocessed.js
Expand Up @@ -9,6 +9,9 @@
window.textsecure.storage = window.textsecure.storage || {};

window.textsecure.storage.unprocessed = {
getCount() {
return textsecure.storage.protocol.getUnprocessedCount();
},
getAll() {
return textsecure.storage.protocol.getAllUnprocessed();
},
Expand All @@ -24,5 +27,8 @@
remove(id) {
return textsecure.storage.protocol.removeUnprocessed(id);
},
removeAll() {
return textsecure.storage.protocol.removeAllUnprocessed();
},
};
})();

0 comments on commit 76e72f8

Please sign in to comment.