From 76e72f871a94a904bde347ab393f6a0f668d9f95 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Fri, 28 Sep 2018 15:51:26 -0700 Subject: [PATCH] Better handle large numbers of messages in cache on startup --- app/sql.js | 11 +++++++ js/modules/data.js | 5 +++ js/signal_protocol_store.js | 6 ++++ libtextsecure/message_receiver.js | 47 ++++++++++++++++------------ libtextsecure/storage/unprocessed.js | 6 ++++ 5 files changed, 55 insertions(+), 20 deletions(-) diff --git a/app/sql.js b/app/sql.js index 9763698ea4d..1e7262da587 100644 --- a/app/sql.js +++ b/app/sql.js @@ -29,6 +29,7 @@ module.exports = { getNextExpiringMessage, getMessagesByConversation, + getUnprocessedCount, getAllUnprocessed, saveUnprocessed, getUnprocessedById, @@ -723,6 +724,16 @@ async function getUnprocessedById(id) { return jsonToObject(row.json); } +async function getUnprocessedCount() { + const row = await db.get('SELECT count(*) from unprocessed;'); + + if (!row) { + throw new Error('getMessageCount: Unable to get count of unprocessed'); + } + + return row['count(*)']; +} + async function getAllUnprocessed() { const rows = await db.all( 'SELECT json FROM unprocessed ORDER BY timestamp ASC;' diff --git a/js/modules/data.js b/js/modules/data.js index cb9a4174674..368693312c2 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -56,6 +56,7 @@ module.exports = { getNextExpiringMessage, getMessagesByConversation, + getUnprocessedCount, getAllUnprocessed, getUnprocessedById, saveUnprocessed, @@ -356,6 +357,10 @@ async function getNextExpiringMessage({ MessageCollection }) { return new MessageCollection(messages); } +async function getUnprocessedCount() { + return channels.getUnprocessedCount(); +} + async function getAllUnprocessed() { return channels.getAllUnprocessed(); } diff --git a/js/signal_protocol_store.js b/js/signal_protocol_store.js index 70dd0cdca65..5b032265ee6 100644 --- a/js/signal_protocol_store.js +++ b/js/signal_protocol_store.js @@ -939,6 +939,9 @@ }, // Not yet processed messages - for resiliency + getUnprocessedCount() { + return window.Signal.Data.getUnprocessedCount(); + }, getAllUnprocessed() { return window.Signal.Data.getAllUnprocessed(); }, @@ -959,6 +962,9 @@ removeUnprocessed(id) { return window.Signal.Data.removeUnprocessed(id, { Unprocessed }); }, + removeAllUnprocessed() { + return window.Signal.Data.removeAllUnprocessed(); + }, async removeAllData() { // First the in-memory caches: window.storage.reset(); // items store diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index b1ac693514b..7ea1e91edc2 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -441,38 +441,45 @@ MessageReceiver.prototype.extend({ envelope.sourceDevice } ${envelope.timestamp.toNumber()}`; }, - getAllFromCache() { + async getAllFromCache() { window.log.info('getAllFromCache'); - return textsecure.storage.unprocessed.getAll().then(items => { - window.log.info( - 'getAllFromCache loaded', - items.length, - 'saved envelopes' + const count = await textsecure.storage.unprocessed.getCount(); + + if (count > 250) { + await textsecure.storage.unprocessed.removeAll(); + window.log.warn( + `There were ${count} messages in cache. Deleted all instead of reprocessing` ); + return []; + } + + const items = await textsecure.storage.unprocessed.getAll(); + window.log.info('getAllFromCache loaded', items.length, 'saved envelopes'); + + return Promise.all( + _.map(items, async item => { + const attempts = 1 + (item.attempts || 0); - return Promise.all( - _.map(items, item => { - const attempts = 1 + (item.attempts || 0); - if (attempts >= 5) { + try { + if (attempts >= 3) { window.log.warn( 'getAllFromCache final attempt for envelope', item.id ); - return textsecure.storage.unprocessed.remove(item.id); + await textsecure.storage.unprocessed.remove(item.id); + } else { + await textsecure.storage.unprocessed.save({ ...item, attempts }); } - return textsecure.storage.unprocessed.save({ ...item, attempts }); - }) - ).then( - () => items, - error => { + } catch (error) { window.log.error( - 'getAllFromCache error updating items after load:', + 'getAllFromCache error updating item after load:', error && error.stack ? error.stack : error ); - return items; } - ); - }); + + return item; + }) + ); }, async addToCache(envelope, plaintext) { const id = this.getEnvelopeId(envelope); diff --git a/libtextsecure/storage/unprocessed.js b/libtextsecure/storage/unprocessed.js index 4990c8e54cd..91bc0e255e5 100644 --- a/libtextsecure/storage/unprocessed.js +++ b/libtextsecure/storage/unprocessed.js @@ -9,6 +9,9 @@ window.textsecure.storage = window.textsecure.storage || {}; window.textsecure.storage.unprocessed = { + getCount() { + return textsecure.storage.protocol.getUnprocessedCount(); + }, getAll() { return textsecure.storage.protocol.getAllUnprocessed(); }, @@ -24,5 +27,8 @@ remove(id) { return textsecure.storage.protocol.removeUnprocessed(id); }, + removeAll() { + return textsecure.storage.protocol.removeAllUnprocessed(); + }, }; })();