From a2a361f0bf4644c52f5320f5480cd55a8d803345 Mon Sep 17 00:00:00 2001 From: Josh Perez <60019601+josh-signal@users.noreply.github.com> Date: Fri, 9 Apr 2021 13:12:05 -0700 Subject: [PATCH] Extra validations for storage service --- ts/services/storage.ts | 175 +++++++++++++++++--------------- ts/services/storageRecordOps.ts | 4 + 2 files changed, 99 insertions(+), 80 deletions(-) diff --git a/ts/services/storage.ts b/ts/services/storage.ts index 4fdda71e86e..54531b40021 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -32,6 +32,7 @@ import { import { ConversationModel } from '../models/conversations'; import { storageJobQueue } from '../util/JobQueue'; import { sleep } from '../util/sleep'; +import { isMoreRecentThan } from '../util/timestamp'; import { isStorageWriteFeatureEnabled } from '../storage/isFeatureEnabled'; const { @@ -41,7 +42,7 @@ const { let consecutiveStops = 0; let consecutiveConflicts = 0; -const forcedPushBucket: Array = []; +const uploadBucket: Array = []; const validRecordTypes = new Set([ 0, // UNKNOWN @@ -125,6 +126,7 @@ type GeneratedManifestType = { async function generateManifest( version: number, + previousManifest?: ManifestRecordClass, isNewManifest = false ): Promise { window.log.info( @@ -138,6 +140,7 @@ async function generateManifest( const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type; const conversationsToUpdate = []; + const insertKeys: Array = []; const deleteKeys: Array = []; const manifestRecordKeys: Set = new Set(); const newItems: Set = new Set(); @@ -206,6 +209,7 @@ async function generateManifest( newItems.add(storageItem); if (storageID) { + insertKeys.push(storageID); window.log.info( 'storageService.generateManifest: new key', conversation.idForLogging(), @@ -343,6 +347,60 @@ async function generateManifest( storageKeyDuplicates.clear(); + // If we have a copy of what the current remote manifest is then we run these + // additional validations comparing our pending manifest to the remote + // manifest: + if (previousManifest) { + const pendingInserts: Set = new Set(); + const pendingDeletes: Set = new Set(); + + const remoteKeys: Set = new Set(); + previousManifest.keys.forEach( + (identifier: ManifestRecordIdentifierClass) => { + const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer()); + remoteKeys.add(storageID); + } + ); + + const localKeys: Set = new Set(); + manifestRecordKeys.forEach((identifier: ManifestRecordIdentifierClass) => { + const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer()); + localKeys.add(storageID); + + if (!remoteKeys.has(storageID)) { + pendingInserts.add(storageID); + } + }); + + remoteKeys.forEach(storageID => { + if (!localKeys.has(storageID)) { + pendingDeletes.add(storageID); + } + }); + + if (deleteKeys.length !== pendingDeletes.size) { + throw new Error('invalid write delete keys length do not match'); + } + if (newItems.size !== pendingInserts.size) { + throw new Error('invalid write insert items length do not match'); + } + deleteKeys.forEach(key => { + const storageID = arrayBufferToBase64(key); + if (!pendingDeletes.has(storageID)) { + throw new Error( + 'invalid write delete key missing from pending deletes' + ); + } + }); + insertKeys.forEach(storageID => { + if (!pendingInserts.has(storageID)) { + throw new Error( + 'invalid write insert key missing from pending inserts' + ); + } + }); + } + const manifestRecord = new window.textsecure.protobuf.ManifestRecord(); manifestRecord.version = version; manifestRecord.keys = Array.from(manifestRecordKeys); @@ -488,7 +546,7 @@ async function createNewManifest() { conversationsToUpdate, newItems, storageManifest, - } = await generateManifest(version, true); + } = await generateManifest(version, undefined, true); await uploadManifest(version, { conversationsToUpdate, @@ -714,17 +772,7 @@ async function processManifest( }); }); - // if the remote only keys are larger or equal to our local keys then it - // was likely a forced push of storage service. We keep track of these - // merges so that we can detect possible infinite loops - const isForcePushed = remoteOnlyRecords.size >= localKeys.size; - - const conflictCount = await processRemoteRecords( - remoteOnlyRecords, - isForcePushed - ); - - let hasConflicts = conflictCount !== 0; + const conflictCount = await processRemoteRecords(remoteOnlyRecords); // Post-merge, if our local records contain any storage IDs that were not // present in the remote manifest then we'll need to clear it, generate a @@ -739,21 +787,16 @@ async function processManifest( redactStorageID(storageID), conversation.idForLogging() ); - conversation.set({ - needsStorageServiceSync: true, - storageID: undefined, - }); + conversation.unset('storageID'); updateConversation(conversation.attributes); - hasConflicts = true; } }); - return hasConflicts; + return conflictCount !== 0; } async function processRemoteRecords( - remoteOnlyRecords: Map, - isForcePushed = false + remoteOnlyRecords: Map ): Promise { const storageKeyBase64 = window.storage.get('storageKey'); const storageKey = base64ToArrayBuffer(storageKeyBase64); @@ -916,50 +959,6 @@ async function processRemoteRecords( // fresh. window.storage.put('storage-service-error-records', newRecordsWithErrors); - const now = Date.now(); - - if (isForcePushed) { - window.log.info( - 'storageService.processRemoteRecords: remote manifest was likely force pushed', - now - ); - forcedPushBucket.push(now); - - // we need to check our conversations because maybe all of them were not - // updated properly, for those that weren't we'll clear their storage - // key so that they can be included in the next update - window.getConversations().forEach((conversation: ConversationModel) => { - const storageID = conversation.get('storageID'); - if (storageID && !remoteOnlyRecords.has(storageID)) { - window.log.info( - 'storageService.processRemoteRecords: clearing storageID', - conversation.idForLogging() - ); - conversation.unset('storageID'); - } - }); - - if (forcedPushBucket.length >= 3) { - const [firstMostRecentForcedPush] = forcedPushBucket; - - if (now - firstMostRecentForcedPush < 5 * MINUTE) { - window.log.info( - 'storageService.processRemoteRecords: thrasing? Backing off' - ); - const error = new Error(); - error.code = 'E_BACKOFF'; - throw error; - } - - window.log.info( - 'storageService.processRemoteRecords: thrash timestamp of first -> now', - firstMostRecentForcedPush, - now - ); - forcedPushBucket.shift(); - } - } - if (conflictCount !== 0) { window.log.info( 'storageService.processRemoteRecords: ' + @@ -980,13 +979,13 @@ async function processRemoteRecords( return 0; } -async function sync(): Promise { +async function sync(): Promise { if (!isStorageWriteFeatureEnabled()) { window.log.info( 'storageService.sync: Not starting desktop.storage is falsey' ); - return; + return undefined; } if (!window.storage.get('storageKey')) { @@ -995,6 +994,7 @@ async function sync(): Promise { window.log.info('storageService.sync: starting...'); + let manifest: ManifestRecordClass | undefined; try { // If we've previously interacted with strage service, update 'fetchComplete' record const previousFetchComplete = window.storage.get('storageFetchComplete'); @@ -1004,12 +1004,12 @@ async function sync(): Promise { } const localManifestVersion = manifestFromStorage || 0; - const manifest = await fetchManifest(localManifestVersion); + manifest = await fetchManifest(localManifestVersion); // Guarding against no manifests being returned, everything should be ok if (!manifest) { window.log.info('storageService.sync: no new manifest'); - return; + return undefined; } const version = manifest.version.toNumber(); @@ -1032,16 +1032,10 @@ async function sync(): Promise { 'storageService.sync: error processing manifest', err && err.stack ? err.stack : String(err) ); - - // When we're told to backoff, backoff to the max which should be - // ~5 minutes. If this job was running inside a queue it'll probably time - // out. - if (err.code === 'E_BACKOFF') { - await backOff(9001); - } } window.log.info('storageService.sync: complete'); + return manifest; } async function upload(fromSync = false): Promise { @@ -1057,6 +1051,22 @@ async function upload(fromSync = false): Promise { throw new Error('storageService.upload: We are offline!'); } + // Rate limit uploads coming from syncing + if (fromSync) { + uploadBucket.push(Date.now()); + if (uploadBucket.length >= 3) { + const [firstMostRecentWrite] = uploadBucket; + + if (isMoreRecentThan(5 * MINUTE, firstMostRecentWrite)) { + throw new Error( + 'storageService.uploadManifest: too many writes too soon.' + ); + } + + uploadBucket.shift(); + } + } + if (!window.storage.get('storageKey')) { // requesting new keys runs the sync job which will detect the conflict // and re-run the upload job once we're merged and up-to-date. @@ -1068,11 +1078,12 @@ async function upload(fromSync = false): Promise { return; } + let previousManifest: ManifestRecordClass | undefined; if (!fromSync) { // Syncing before we upload so that we repair any unknown records and // records with errors as well as ensure that we have the latest up to date // manifest. - await sync(); + previousManifest = await sync(); } const localManifestVersion = window.storage.get('manifestVersion') || 0; @@ -1084,7 +1095,7 @@ async function upload(fromSync = false): Promise { ); try { - const generatedManifest = await generateManifest(version); + const generatedManifest = await generateManifest(version, previousManifest); await uploadManifest(version, generatedManifest); } catch (err) { if (err.code === 409) { @@ -1130,7 +1141,9 @@ export const storageServiceUploadJob = debounce(() => { return; } - storageJobQueue(upload, `upload v${window.storage.get('manifestVersion')}`); + storageJobQueue(async () => { + await upload(); + }, `upload v${window.storage.get('manifestVersion')}`); }, 500); export const runStorageServiceSyncJob = debounce(() => { @@ -1141,5 +1154,7 @@ export const runStorageServiceSyncJob = debounce(() => { return; } - storageJobQueue(sync, `sync v${window.storage.get('manifestVersion')}`); + storageJobQueue(async () => { + await sync(); + }, `sync v${window.storage.get('manifestVersion')}`); }, 500); diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index 1a5ff73981e..67c4b57c155 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -506,6 +506,10 @@ export async function mergeGroupV1Record( conversation.idForLogging() ); } else { + if (groupV1Record.id.byteLength !== 16) { + throw new Error('Not a valid gv1'); + } + conversation = await window.ConversationController.getOrCreateAndWait( groupId, 'group'