From 019f6becb0bb47514936c13941248daa24be9677 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Mon, 10 Nov 2025 14:52:25 +1100 Subject: [PATCH] Blinded request message parsing and processing --- .../sending_receiving/MessageParser.kt | 38 +++++++++++ .../ReceivedMessageProcessor.kt | 66 +++++++++++++++++-- .../pollers/OpenGroupPoller.kt | 20 +++++- 3 files changed, 115 insertions(+), 9 deletions(-) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageParser.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageParser.kt index 26daf05fc4..4cf5defd89 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageParser.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageParser.kt @@ -1,5 +1,6 @@ package org.session.libsession.messaging.sending_receiving +import network.loki.messenger.libsession_util.SessionEncrypt import network.loki.messenger.libsession_util.protocol.DecodedEnvelope import network.loki.messenger.libsession_util.protocol.SessionProtocol import org.session.libsession.database.StorageProtocol @@ -15,11 +16,13 @@ import org.session.libsession.messaging.messages.control.UnsendRequest import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.open_groups.OpenGroupApi import org.session.libsession.snode.SnodeClock +import org.session.libsession.utilities.Address import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsignal.exceptions.NonRetryableException import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 +import org.session.libsignal.utilities.Hex import org.session.libsignal.utilities.IdPrefix import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -237,4 +240,39 @@ class MessageParser @Inject constructor( message.openGroupServerMessageID = msg.id } } + + fun parseCommunityDirectMessage( + msg: OpenGroupApi.DirectMessage, + communityServerPubKeyHex: String, + currentUserEd25519PrivKey: ByteArray, + currentUserId: AccountId, + currentUserBlindedIDs: List, + ): Pair { + val (senderId, plaintext) = SessionEncrypt.decryptForBlindedRecipient( + ciphertext = Base64.decode(msg.message), + myEd25519Privkey = currentUserEd25519PrivKey, + openGroupPubkey = Hex.fromStringCondensed(communityServerPubKeyHex), + senderBlindedId = Hex.fromStringCondensed(msg.sender), + recipientBlindId = Hex.fromStringCondensed(msg.recipient), + ) + + val decoded = SessionProtocol.decodeForCommunity( + payload = plaintext.data, + nowEpochMs = snodeClock.currentTimeMills(), + proBackendPubKey = proBackendKey, + ) + + val sender = Address.Standard(AccountId(senderId)) + + return parseMessage( + contentPlaintext = decoded.contentPlainText.data, + relaxSignatureCheck = true, + checkForBlockStatus = false, + isForGroup = false, + currentUserId = currentUserId, + sender = sender.accountId, + messageTimestampMs = (msg.postedAt * 1000), + currentUserBlindedIDs = currentUserBlindedIDs, + ) + } } \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageProcessor.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageProcessor.kt index bbb3d408ea..9dc0ce1eb5 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageProcessor.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageProcessor.kt @@ -14,6 +14,7 @@ import okio.withLock import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.userAuth import org.session.libsession.messaging.messages.Message +import org.session.libsession.messaging.messages.Message.Companion.senderOrSync import org.session.libsession.messaging.messages.control.CallMessage import org.session.libsession.messaging.messages.control.DataExtractionNotification import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate @@ -29,6 +30,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN import org.session.libsession.messaging.utilities.WebRtcUtils import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.Address +import org.session.libsession.utilities.Address.Companion.toAddress import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.GroupUtil.doubleEncodeGroupID import org.session.libsession.utilities.SSKEnvironment @@ -76,6 +78,15 @@ class ReceivedMessageProcessor @Inject constructor( ) { private val threadMutexes = ConcurrentHashMap() + private inline fun withThreadLock( + threadAddress: Address.Conversable, + block: () -> T + ) { + threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock { + block() + } + } + /** * Start a message processing session, ensuring that thread updates and notifications are handled @@ -119,7 +130,7 @@ class ReceivedMessageProcessor @Inject constructor( threadAddress: Address.Conversable, message: Message, proto: SignalServiceProtos.Content, - ) = threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock { + ) = withThreadLock(threadAddress) { // The logic to check if the message should be discarded due to being from a hidden contact. if (threadAddress is Address.Standard && message.sentTimestamp != null && @@ -130,7 +141,7 @@ class ReceivedMessageProcessor @Inject constructor( ) ) { log { "Dropping message from hidden contact ${threadAddress.debugString}" } - return@withLock + return@withThreadLock } // Get or create thread ID, if we aren't allowed to create it, and it doesn't exist, drop the message @@ -142,7 +153,7 @@ class ReceivedMessageProcessor @Inject constructor( .also { id -> if (id == -1L) { log { "Dropping message for non-existing thread ${threadAddress.debugString}" } - return@withLock + return@withThreadLock } else { context.threadIDs[threadAddress] = id } @@ -201,23 +212,64 @@ class ReceivedMessageProcessor @Inject constructor( fun processCommunityInboxMessage( context: MessageProcessingContext, + communityServerUrl: String, + communityServerPubKeyHex: String, message: OpenGroupApi.DirectMessage ) { - //TODO("Waiting for the implementation from libsession_util") + val (message, proto) = messageParser.parseCommunityDirectMessage( + msg = message, + currentUserId = context.currentUserId, + currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data, + currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl), + communityServerPubKeyHex = communityServerPubKeyHex, + ) + + val threadAddress = message.senderOrSync.toAddress() as Address.Conversable + + withThreadLock(threadAddress) { + processSwarmMessage( + context = context, + threadAddress = threadAddress, + message = message, + proto = proto + ) + } } fun processCommunityOutboxMessage( context: MessageProcessingContext, - message: OpenGroupApi.DirectMessage + communityServerUrl: String, + communityServerPubKeyHex: String, + msg: OpenGroupApi.DirectMessage ) { - //TODO("Waiting for the implementation from libsession_util") + val (message, proto) = messageParser.parseCommunityDirectMessage( + msg = msg, + currentUserId = context.currentUserId, + currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data, + currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl), + communityServerPubKeyHex = communityServerPubKeyHex, + ) + + val threadAddress = Address.CommunityBlindedId( + serverUrl = communityServerUrl, + blindedId = Address.Blinded(AccountId(msg.recipient)) + ) + + withThreadLock(threadAddress) { + processSwarmMessage( + context = context, + threadAddress = threadAddress, + message = message, + proto = proto + ) + } } fun processCommunityMessage( context: MessageProcessingContext, threadAddress: Address.Community, message: OpenGroupApi.Message, - ) = threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock { + ) = withThreadLock(threadAddress) { var messageId = messageParser.parseCommunityMessage( msg = message, currentUserId = context.currentUserId, diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt index 9ad9213197..e83caa4d04 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -332,6 +332,12 @@ class OpenGroupPoller @AssistedInject constructor( if (messages.isEmpty()) return val sorted = messages.sortedBy { it.postedAt } + val serverPubKeyHex = storage.getOpenGroupPublicKey(server) + ?: run { + Log.e(TAG, "No community server public key cannot process inbox messages") + return + } + receivedMessageProcessor.startProcessing("CommunityInbox") { ctx -> for (apiMessage in sorted) { try { @@ -340,6 +346,8 @@ class OpenGroupPoller @AssistedInject constructor( receivedMessageProcessor.processCommunityInboxMessage( context = ctx, message = apiMessage, + communityServerUrl = server, + communityServerPubKeyHex = serverPubKeyHex, ) } catch (e: Exception) { @@ -358,6 +366,12 @@ class OpenGroupPoller @AssistedInject constructor( if (messages.isEmpty()) return val sorted = messages.sortedBy { it.postedAt } + val serverPubKeyHex = storage.getOpenGroupPublicKey(server) + ?: run { + Log.e(TAG, "No community server public key cannot process inbox messages") + return + } + receivedMessageProcessor.startProcessing("CommunityOutbox") { ctx -> for (apiMessage in sorted) { try { @@ -365,11 +379,13 @@ class OpenGroupPoller @AssistedInject constructor( receivedMessageProcessor.processCommunityOutboxMessage( context = ctx, - message = apiMessage, + msg = apiMessage, + communityServerUrl = server, + communityServerPubKeyHex = serverPubKeyHex, ) } catch (e: Exception) { - Log.e(TAG, "Error processing inbox message", e) + Log.e(TAG, "Error processing outbox message", e) } } }