Skip to content

Commit

Permalink
fix: first working test ios to desktop
Browse files Browse the repository at this point in the history
still have some tests to fix
  • Loading branch information
Bilb committed Oct 12, 2023
1 parent ceffa1e commit 9492fdc
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 65 deletions.
34 changes: 5 additions & 29 deletions ts/receiver/contentMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ import { assertUnreachable } from '../types/sqlSharedTypes';
import { BlockedNumberController } from '../util';
import { ReadReceipts } from '../util/readReceipts';
import { Storage } from '../util/storage';
import { ContactsWrapperActions } from '../webworker/workers/browser/libsession_worker_interface';
import { handleCallMessage } from './callMessage';
import { getAllCachedECKeyPair, sentAtMoreRecentThanWrapper } from './closedGroups';
import { ECKeyPair } from './keypairs';
import {
ContactsWrapperActions,
MetaGroupWrapperActions,
} from '../webworker/workers/browser/libsession_worker_interface';
import { PreConditionFailed } from '../session/utils/errors';

export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) {
try {
Expand All @@ -58,27 +54,6 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH
}
}

async function decryptForGroupV2(envelope: EnvelopePlus) {
window?.log?.info('received closed group message v2');
try {
const groupPk = envelope.source;
if (!PubKey.isClosedGroupV2(groupPk)) {
throw new PreConditionFailed('decryptForGroupV2: not a 03 prefixed group');
}

const decrypted = await MetaGroupWrapperActions.decryptMessage(groupPk, envelope.content);

// the receiving pipeline relies on the envelope.senderIdentity field to know who is the author of a message
// eslint-disable-next-line no-param-reassign
envelope.senderIdentity = decrypted.pubkeyHex;

return decrypted.plaintext;
} catch (e) {
window.log.warn('failed to decrypt message with error: ', e.message);
return null;
}
}

async function decryptForClosedGroup(envelope: EnvelopePlus) {
window?.log?.info('received closed group message');
try {
Expand Down Expand Up @@ -291,10 +266,11 @@ async function decrypt(envelope: EnvelopePlus): Promise<any> {
break;
case SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE:
if (PubKey.isClosedGroupV2(envelope.source)) {
plaintext = await decryptForGroupV2(envelope);
} else {
plaintext = await decryptForClosedGroup(envelope);
// groupv2 messages are decrypted way earlier than this via libsession, and what we get here is already decrypted
return envelope.content;
}
plaintext = await decryptForClosedGroup(envelope);

break;
default:
assertUnreachable(envelope.type, `Unknown message type:${envelope.type}`);
Expand Down
18 changes: 11 additions & 7 deletions ts/receiver/receiver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable more/no-then */
import _, { isEmpty, last } from 'lodash';
import { v4 as uuidv4 } from 'uuid';
import _ from 'lodash';

import { EnvelopePlus } from './types';

Expand Down Expand Up @@ -70,18 +70,22 @@ function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
}
}

function contentIsEnvelope(content: Uint8Array | EnvelopePlus): content is EnvelopePlus {
return !isEmpty((content as EnvelopePlus).content);
}

async function handleRequestDetail(
plaintext: Uint8Array,
data: Uint8Array | EnvelopePlus,
inConversation: string | null,
lastPromise: Promise<any>,
messageHash: string
): Promise<void> {
const envelope: any = SignalService.Envelope.decode(plaintext);
const envelope: any = contentIsEnvelope(data) ? data : SignalService.Envelope.decode(data);

// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
// The message is for a medium size group
// The message is for a group
if (inConversation) {
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
const senderIdentity = envelope.source;
Expand All @@ -95,7 +99,7 @@ async function handleRequestDetail(
envelope.source = inConversation;

// eslint-disable-next-line no-param-reassign
plaintext = SignalService.Envelope.encode(envelope).finish();
data = SignalService.Envelope.encode(envelope).finish();
envelope.senderIdentity = senderIdentity;
}

Expand All @@ -109,7 +113,7 @@ async function handleRequestDetail(
// need to handle senderIdentity separately)...
perfStart(`addToCache-${envelope.id}`);

await addToCache(envelope, plaintext, messageHash);
await addToCache(envelope, contentIsEnvelope(data) ? data.content : data, messageHash);
perfEnd(`addToCache-${envelope.id}`, 'addToCache');

// To ensure that we queue in the same order we receive messages
Expand All @@ -133,7 +137,7 @@ export function handleRequest(
inConversation: string | null,
messageHash: string
): void {
const lastPromise = _.last(incomingMessagePromises) || Promise.resolve();
const lastPromise = last(incomingMessagePromises) || Promise.resolve();

const promise = handleRequestDetail(plaintext, inConversation, lastPromise, messageHash).catch(
e => {
Expand Down
82 changes: 63 additions & 19 deletions ts/session/apis/snode_api/swarmPolling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { compact, concat, difference, flatten, isArray, last, sample, uniqBy } from 'lodash';
import { v4 } from 'uuid';
import { Data, Snode } from '../../../data/data';
import { SignalService } from '../../../protobuf';
import * as Receiver from '../../../receiver/receiver';
Expand All @@ -12,6 +13,8 @@ import * as snodePool from './snodePool';

import { ConversationModel } from '../../../models/conversation';
import { ConversationTypeEnum } from '../../../models/conversationAttributes';
import { signalservice } from '../../../protobuf/compiled';
import { EnvelopePlus } from '../../../receiver/types';
import { updateIsOnline } from '../../../state/ducks/onion';
import { assertUnreachable } from '../../../types/sqlSharedTypes';
import {
Expand All @@ -24,6 +27,7 @@ import { ConvoHub } from '../../conversations';
import { ed25519Str } from '../../onions/onionPath';
import { StringUtils, UserUtils } from '../../utils';
import { perfEnd, perfStart } from '../../utils/Performance';
import { PreConditionFailed } from '../../utils/errors';
import { LibSessionUtil } from '../../utils/libsession/libsession_utils';
import { SnodeNamespace, SnodeNamespaces, UserConfigNamespaces } from './namespaces';
import { PollForGroup, PollForLegacy, PollForUs } from './pollingTypes';
Expand All @@ -37,25 +41,17 @@ import {
RetrieveRequestResult,
} from './types';

export function extractWebSocketContent(
message: string,
messageHash: string
): null | {
body: Uint8Array;
messageHash: string;
} {
export function extractWebSocketContent(message: string): null | Uint8Array {
try {
const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64'));
const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext);
if (
messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST &&
messageBuf.request?.body?.length
) {
return {
body: messageBuf.request.body,
messageHash,
};
return messageBuf.request.body;
}

return null;
} catch (error) {
window?.log?.warn('extractWebSocketContent from message failed with:', error.message);
Expand Down Expand Up @@ -374,21 +370,38 @@ export class SwarmPolling {
perfStart(`handleSeenMessages-${pubkey}`);
const newMessages = await this.handleSeenMessages(uniqOtherMsgs);
perfEnd(`handleSeenMessages-${pubkey}`, 'handleSeenMessages');
if (type === ConversationTypeEnum.GROUPV3) {
for (let index = 0; index < newMessages.length; index++) {
const msg = newMessages[index];
const retrieveResult = new Uint8Array(StringUtils.encode(msg.data, 'base64'));
try {
const envelopePlus = await decryptForGroupV2({
content: retrieveResult,
groupPk: pubkey,
sentTimestamp: msg.timestamp,
});
if (!envelopePlus) {
throw new Error('decryptForGroupV2 returned empty envelope');
}

// this is the processing of the message itself, which can be long.
Receiver.handleRequest(envelopePlus.content, envelopePlus.source, msg.hash);
} catch (e) {
window.log.warn('failed to handle groupv2 otherMessage because of: ', e.message);
}
}
return;
}

// trigger the handling of all the other messages, not shared config related
newMessages.forEach(m => {
const content = extractWebSocketContent(m.data, m.hash);
const content = extractWebSocketContent(m.data);

if (!content) {
return;
}

Receiver.handleRequest(
content.body,
type === ConversationTypeEnum.GROUP || type === ConversationTypeEnum.GROUPV3
? pubkey
: null,
content.messageHash
);
Receiver.handleRequest(content, type === ConversationTypeEnum.GROUP ? pubkey : null, m.hash);
});
}

Expand Down Expand Up @@ -721,3 +734,34 @@ function filterMessagesPerTypeOfConvo<T extends ConversationTypeEnum>(
return { confMessages: null, otherMessages: [] };
}
}

async function decryptForGroupV2(retrieveResult: {
groupPk: string;
content: Uint8Array;
sentTimestamp: number;
}): Promise<EnvelopePlus | null> {
window?.log?.info('received closed group message v2');
try {
const groupPk = retrieveResult.groupPk;
if (!PubKey.isClosedGroupV2(groupPk)) {
throw new PreConditionFailed('decryptForGroupV2: not a 03 prefixed group');
}

const decrypted = await MetaGroupWrapperActions.decryptMessage(groupPk, retrieveResult.content);
const envelopePlus: EnvelopePlus = {
id: v4(),
senderIdentity: decrypted.pubkeyHex,
receivedAt: Date.now(),
content: decrypted.plaintext,
source: groupPk,
type: signalservice.Envelope.Type.CLOSED_GROUP_MESSAGE,
timestamp: retrieveResult.sentTimestamp,
};
// the receiving pipeline relies on the envelope.senderIdentity field to know who is the author of a message

return envelopePlus;
} catch (e) {
window.log.warn('failed to decrypt message with error: ', e.message);
return null;
}
}
53 changes: 50 additions & 3 deletions ts/session/sending/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,50 @@ type EncryptAndWrapMessageResults = {
namespace: number;
} & SharedEncryptAndWrap;

async function encryptForGroupV2(
params: EncryptAndWrapMessage
): Promise<EncryptAndWrapMessageResults> {
// Group v2 encryption works a bit differently: we encrypt the envelope itself through libsession.
// We essentially need to do the opposite of the usual encryption which is send envelope unencrypted with content encrypted.
const {
destination,
identifier,
isSyncMessage: syncMessage,
namespace,
plainTextBuffer,
ttl,
} = params;

const { overRiddenTimestampBuffer, networkTimestamp } =
overwriteOutgoingTimestampWithNetworkTimestamp({ plainTextBuffer });
const envelope = await buildEnvelope(
SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE,
destination,
networkTimestamp,
overRiddenTimestampBuffer
);

const recipient = PubKey.cast(destination);

const { cipherText } = await MessageEncrypter.encrypt(
recipient,
SignalService.Envelope.encode(envelope).finish(),
encryptionBasedOnConversation(recipient)
);

const data64 = ByteBuffer.wrap(cipherText).toString('base64');

return {
data64,
networkTimestamp,
data: cipherText,
namespace,
ttl,
identifier,
isSyncMessage: syncMessage,
};
}

async function encryptMessageAndWrap(
params: EncryptAndWrapMessage
): Promise<EncryptAndWrapMessageResults> {
Expand All @@ -319,6 +363,10 @@ async function encryptMessageAndWrap(
ttl,
} = params;

if (PubKey.isClosedGroupV2(destination)) {
return encryptForGroupV2(params);
}

const { overRiddenTimestampBuffer, networkTimestamp } =
overwriteOutgoingTimestampWithNetworkTimestamp({ plainTextBuffer });
const recipient = PubKey.cast(destination);
Expand All @@ -330,8 +378,7 @@ async function encryptMessageAndWrap(
);

const envelope = await buildEnvelope(envelopeType, recipient.key, networkTimestamp, cipherText);

const data = wrapEnvelope(envelope);
const data = wrapEnvelopeInWebSocketMessage(envelope);
const data64 = ByteBuffer.wrap(data).toString('base64');

return {
Expand Down Expand Up @@ -423,7 +470,7 @@ async function buildEnvelope(
* This is an outdated practice and we should probably just send the envelope data directly.
* Something to think about in the future.
*/
function wrapEnvelope(envelope: SignalService.Envelope): Uint8Array {
function wrapEnvelopeInWebSocketMessage(envelope: SignalService.Envelope): Uint8Array {
const request = SignalService.WebSocketRequestMessage.create({
id: 0,
body: SignalService.Envelope.encode(envelope).finish(),
Expand Down
11 changes: 6 additions & 5 deletions ts/session/utils/job_runners/jobs/UserSyncJob.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable no-await-in-loop */
import { PubkeyType } from 'libsession_util_nodejs';
import { isArray, isEmpty, isNumber } from 'lodash';
import { isArray, isEmpty, isNumber, isString } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
Expand All @@ -17,9 +17,9 @@ import { LibSessionUtil, UserSuccessfulChange } from '../../libsession/libsessio
import { runners } from '../JobRunner';
import {
AddJobCheckReturn,
UserSyncPersistedData,
PersistedJob,
RunJobResult,
UserSyncPersistedData,
} from '../PersistedJob';

const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
Expand Down Expand Up @@ -61,8 +61,8 @@ function triggerConfSyncJobDone() {
window.Whisper.events.trigger(UserSyncJobDone);
}

function isPubkey(us: string): us is PubkeyType {
return us.startsWith('05');
function isPubkey(us: unknown): us is PubkeyType {
return isString(us) && us.startsWith('05');
}

async function pushChangesToUserSwarmIfNeeded() {
Expand Down Expand Up @@ -153,7 +153,7 @@ class UserSyncJob extends PersistedJob<UserSyncPersistedData> {
return RunJobResult.PermanentFailure;
}

return await pushChangesToUserSwarmIfNeeded();
return await UserSync.pushChangesToUserSwarmIfNeeded();
// eslint-disable-next-line no-useless-catch
} catch (e) {
throw e;
Expand Down Expand Up @@ -228,5 +228,6 @@ async function queueNewJobIfNeeded() {

export const UserSync = {
UserSyncJob,
pushChangesToUserSwarmIfNeeded,
queueNewJobIfNeeded: () => allowOnlyOneAtATime('UserSyncJob-oneAtAtTime', queueNewJobIfNeeded),
};
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ describe('LibSessionUtil pendingChangesForGroup', () => {
});
});

describe('LibSessionUtil pendingChangesForUser', () => {
describe('LibSessionUtil pendingChangesForUs', () => {
beforeEach(async () => {});

afterEach(() => {
Expand Down

0 comments on commit 9492fdc

Please sign in to comment.