Skip to content

Commit

Permalink
Extra validations for storage service
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-signal committed Apr 9, 2021
1 parent e47b00a commit a2a361f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 80 deletions.
175 changes: 95 additions & 80 deletions ts/services/storage.ts
Expand Up @@ -32,6 +32,7 @@ import {
import { ConversationModel } from '../models/conversations';
import { storageJobQueue } from '../util/JobQueue';
import { sleep } from '../util/sleep';
import { isMoreRecentThan } from '../util/timestamp';
import { isStorageWriteFeatureEnabled } from '../storage/isFeatureEnabled';

const {
Expand All @@ -41,7 +42,7 @@ const {

let consecutiveStops = 0;
let consecutiveConflicts = 0;
const forcedPushBucket: Array<number> = [];
const uploadBucket: Array<number> = [];

const validRecordTypes = new Set([
0, // UNKNOWN
Expand Down Expand Up @@ -125,6 +126,7 @@ type GeneratedManifestType = {

async function generateManifest(
version: number,
previousManifest?: ManifestRecordClass,
isNewManifest = false
): Promise<GeneratedManifestType> {
window.log.info(
Expand All @@ -138,6 +140,7 @@ async function generateManifest(
const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type;

const conversationsToUpdate = [];
const insertKeys: Array<string> = [];
const deleteKeys: Array<ArrayBuffer> = [];
const manifestRecordKeys: Set<ManifestRecordIdentifierClass> = new Set();
const newItems: Set<StorageItemClass> = new Set();
Expand Down Expand Up @@ -206,6 +209,7 @@ async function generateManifest(
newItems.add(storageItem);

if (storageID) {
insertKeys.push(storageID);
window.log.info(
'storageService.generateManifest: new key',
conversation.idForLogging(),
Expand Down Expand Up @@ -343,6 +347,60 @@ async function generateManifest(

storageKeyDuplicates.clear();

// If we have a copy of what the current remote manifest is then we run these
// additional validations comparing our pending manifest to the remote
// manifest:
if (previousManifest) {
const pendingInserts: Set<string> = new Set();
const pendingDeletes: Set<string> = new Set();

const remoteKeys: Set<string> = new Set();
previousManifest.keys.forEach(
(identifier: ManifestRecordIdentifierClass) => {
const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer());
remoteKeys.add(storageID);
}
);

const localKeys: Set<string> = new Set();
manifestRecordKeys.forEach((identifier: ManifestRecordIdentifierClass) => {
const storageID = arrayBufferToBase64(identifier.raw.toArrayBuffer());
localKeys.add(storageID);

if (!remoteKeys.has(storageID)) {
pendingInserts.add(storageID);
}
});

remoteKeys.forEach(storageID => {
if (!localKeys.has(storageID)) {
pendingDeletes.add(storageID);
}
});

if (deleteKeys.length !== pendingDeletes.size) {
throw new Error('invalid write delete keys length do not match');
}
if (newItems.size !== pendingInserts.size) {
throw new Error('invalid write insert items length do not match');
}
deleteKeys.forEach(key => {
const storageID = arrayBufferToBase64(key);
if (!pendingDeletes.has(storageID)) {
throw new Error(
'invalid write delete key missing from pending deletes'
);
}
});
insertKeys.forEach(storageID => {
if (!pendingInserts.has(storageID)) {
throw new Error(
'invalid write insert key missing from pending inserts'
);
}
});
}

const manifestRecord = new window.textsecure.protobuf.ManifestRecord();
manifestRecord.version = version;
manifestRecord.keys = Array.from(manifestRecordKeys);
Expand Down Expand Up @@ -488,7 +546,7 @@ async function createNewManifest() {
conversationsToUpdate,
newItems,
storageManifest,
} = await generateManifest(version, true);
} = await generateManifest(version, undefined, true);

await uploadManifest(version, {
conversationsToUpdate,
Expand Down Expand Up @@ -714,17 +772,7 @@ async function processManifest(
});
});

// if the remote only keys are larger or equal to our local keys then it
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
const isForcePushed = remoteOnlyRecords.size >= localKeys.size;

const conflictCount = await processRemoteRecords(
remoteOnlyRecords,
isForcePushed
);

let hasConflicts = conflictCount !== 0;
const conflictCount = await processRemoteRecords(remoteOnlyRecords);

// Post-merge, if our local records contain any storage IDs that were not
// present in the remote manifest then we'll need to clear it, generate a
Expand All @@ -739,21 +787,16 @@ async function processManifest(
redactStorageID(storageID),
conversation.idForLogging()
);
conversation.set({
needsStorageServiceSync: true,
storageID: undefined,
});
conversation.unset('storageID');
updateConversation(conversation.attributes);
hasConflicts = true;
}
});

return hasConflicts;
return conflictCount !== 0;
}

async function processRemoteRecords(
remoteOnlyRecords: Map<string, RemoteRecord>,
isForcePushed = false
remoteOnlyRecords: Map<string, RemoteRecord>
): Promise<number> {
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
Expand Down Expand Up @@ -916,50 +959,6 @@ async function processRemoteRecords(
// fresh.
window.storage.put('storage-service-error-records', newRecordsWithErrors);

const now = Date.now();

if (isForcePushed) {
window.log.info(
'storageService.processRemoteRecords: remote manifest was likely force pushed',
now
);
forcedPushBucket.push(now);

// we need to check our conversations because maybe all of them were not
// updated properly, for those that weren't we'll clear their storage
// key so that they can be included in the next update
window.getConversations().forEach((conversation: ConversationModel) => {
const storageID = conversation.get('storageID');
if (storageID && !remoteOnlyRecords.has(storageID)) {
window.log.info(
'storageService.processRemoteRecords: clearing storageID',
conversation.idForLogging()
);
conversation.unset('storageID');
}
});

if (forcedPushBucket.length >= 3) {
const [firstMostRecentForcedPush] = forcedPushBucket;

if (now - firstMostRecentForcedPush < 5 * MINUTE) {
window.log.info(
'storageService.processRemoteRecords: thrasing? Backing off'
);
const error = new Error();
error.code = 'E_BACKOFF';
throw error;
}

window.log.info(
'storageService.processRemoteRecords: thrash timestamp of first -> now',
firstMostRecentForcedPush,
now
);
forcedPushBucket.shift();
}
}

if (conflictCount !== 0) {
window.log.info(
'storageService.processRemoteRecords: ' +
Expand All @@ -980,13 +979,13 @@ async function processRemoteRecords(
return 0;
}

async function sync(): Promise<void> {
async function sync(): Promise<ManifestRecordClass | undefined> {
if (!isStorageWriteFeatureEnabled()) {
window.log.info(
'storageService.sync: Not starting desktop.storage is falsey'
);

return;
return undefined;
}

if (!window.storage.get('storageKey')) {
Expand All @@ -995,6 +994,7 @@ async function sync(): Promise<void> {

window.log.info('storageService.sync: starting...');

let manifest: ManifestRecordClass | undefined;
try {
// If we've previously interacted with strage service, update 'fetchComplete' record
const previousFetchComplete = window.storage.get('storageFetchComplete');
Expand All @@ -1004,12 +1004,12 @@ async function sync(): Promise<void> {
}

const localManifestVersion = manifestFromStorage || 0;
const manifest = await fetchManifest(localManifestVersion);
manifest = await fetchManifest(localManifestVersion);

// Guarding against no manifests being returned, everything should be ok
if (!manifest) {
window.log.info('storageService.sync: no new manifest');
return;
return undefined;
}

const version = manifest.version.toNumber();
Expand All @@ -1032,16 +1032,10 @@ async function sync(): Promise<void> {
'storageService.sync: error processing manifest',
err && err.stack ? err.stack : String(err)
);

// When we're told to backoff, backoff to the max which should be
// ~5 minutes. If this job was running inside a queue it'll probably time
// out.
if (err.code === 'E_BACKOFF') {
await backOff(9001);
}
}

window.log.info('storageService.sync: complete');
return manifest;
}

async function upload(fromSync = false): Promise<void> {
Expand All @@ -1057,6 +1051,22 @@ async function upload(fromSync = false): Promise<void> {
throw new Error('storageService.upload: We are offline!');
}

// Rate limit uploads coming from syncing
if (fromSync) {
uploadBucket.push(Date.now());
if (uploadBucket.length >= 3) {
const [firstMostRecentWrite] = uploadBucket;

if (isMoreRecentThan(5 * MINUTE, firstMostRecentWrite)) {
throw new Error(
'storageService.uploadManifest: too many writes too soon.'
);
}

uploadBucket.shift();
}
}

if (!window.storage.get('storageKey')) {
// requesting new keys runs the sync job which will detect the conflict
// and re-run the upload job once we're merged and up-to-date.
Expand All @@ -1068,11 +1078,12 @@ async function upload(fromSync = false): Promise<void> {
return;
}

let previousManifest: ManifestRecordClass | undefined;
if (!fromSync) {
// Syncing before we upload so that we repair any unknown records and
// records with errors as well as ensure that we have the latest up to date
// manifest.
await sync();
previousManifest = await sync();
}

const localManifestVersion = window.storage.get('manifestVersion') || 0;
Expand All @@ -1084,7 +1095,7 @@ async function upload(fromSync = false): Promise<void> {
);

try {
const generatedManifest = await generateManifest(version);
const generatedManifest = await generateManifest(version, previousManifest);
await uploadManifest(version, generatedManifest);
} catch (err) {
if (err.code === 409) {
Expand Down Expand Up @@ -1130,7 +1141,9 @@ export const storageServiceUploadJob = debounce(() => {
return;
}

storageJobQueue(upload, `upload v${window.storage.get('manifestVersion')}`);
storageJobQueue(async () => {
await upload();
}, `upload v${window.storage.get('manifestVersion')}`);
}, 500);

export const runStorageServiceSyncJob = debounce(() => {
Expand All @@ -1141,5 +1154,7 @@ export const runStorageServiceSyncJob = debounce(() => {
return;
}

storageJobQueue(sync, `sync v${window.storage.get('manifestVersion')}`);
storageJobQueue(async () => {
await sync();
}, `sync v${window.storage.get('manifestVersion')}`);
}, 500);
4 changes: 4 additions & 0 deletions ts/services/storageRecordOps.ts
Expand Up @@ -506,6 +506,10 @@ export async function mergeGroupV1Record(
conversation.idForLogging()
);
} else {
if (groupV1Record.id.byteLength !== 16) {
throw new Error('Not a valid gv1');
}

conversation = await window.ConversationController.getOrCreateAndWait(
groupId,
'group'
Expand Down

0 comments on commit a2a361f

Please sign in to comment.