Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.session.libsession.messaging.sending_receiving

import network.loki.messenger.libsession_util.SessionEncrypt
import network.loki.messenger.libsession_util.protocol.DecodedEnvelope
import network.loki.messenger.libsession_util.protocol.SessionProtocol
import org.session.libsession.database.StorageProtocol
Expand All @@ -15,11 +16,13 @@ import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.exceptions.NonRetryableException
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.IdPrefix
import java.util.concurrent.TimeUnit
import javax.inject.Inject
Expand Down Expand Up @@ -237,4 +240,39 @@ class MessageParser @Inject constructor(
message.openGroupServerMessageID = msg.id
}
}

fun parseCommunityDirectMessage(
msg: OpenGroupApi.DirectMessage,
communityServerPubKeyHex: String,
currentUserEd25519PrivKey: ByteArray,
currentUserId: AccountId,
currentUserBlindedIDs: List<AccountId>,
): Pair<Message, SignalServiceProtos.Content> {
val (senderId, plaintext) = SessionEncrypt.decryptForBlindedRecipient(
ciphertext = Base64.decode(msg.message),
myEd25519Privkey = currentUserEd25519PrivKey,
openGroupPubkey = Hex.fromStringCondensed(communityServerPubKeyHex),
senderBlindedId = Hex.fromStringCondensed(msg.sender),
recipientBlindId = Hex.fromStringCondensed(msg.recipient),
)

val decoded = SessionProtocol.decodeForCommunity(
payload = plaintext.data,
nowEpochMs = snodeClock.currentTimeMills(),
proBackendPubKey = proBackendKey,
)

val sender = Address.Standard(AccountId(senderId))

return parseMessage(
contentPlaintext = decoded.contentPlainText.data,
relaxSignatureCheck = true,
checkForBlockStatus = false,
isForGroup = false,
currentUserId = currentUserId,
sender = sender.accountId,
messageTimestampMs = (msg.postedAt * 1000),
currentUserBlindedIDs = currentUserBlindedIDs,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import okio.withLock
import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.Message.Companion.senderOrSync
import org.session.libsession.messaging.messages.control.CallMessage
import org.session.libsession.messaging.messages.control.DataExtractionNotification
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
Expand All @@ -29,6 +30,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN
import org.session.libsession.messaging.utilities.WebRtcUtils
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.Address.Companion.toAddress
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.GroupUtil.doubleEncodeGroupID
import org.session.libsession.utilities.SSKEnvironment
Expand Down Expand Up @@ -76,6 +78,15 @@ class ReceivedMessageProcessor @Inject constructor(
) {
private val threadMutexes = ConcurrentHashMap<Address.Conversable, ReentrantLock>()

private inline fun <T> withThreadLock(
threadAddress: Address.Conversable,
block: () -> T
) {
threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock {
block()
}
}


/**
* Start a message processing session, ensuring that thread updates and notifications are handled
Expand Down Expand Up @@ -119,7 +130,7 @@ class ReceivedMessageProcessor @Inject constructor(
threadAddress: Address.Conversable,
message: Message,
proto: SignalServiceProtos.Content,
) = threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock {
) = withThreadLock(threadAddress) {
// The logic to check if the message should be discarded due to being from a hidden contact.
if (threadAddress is Address.Standard &&
message.sentTimestamp != null &&
Expand All @@ -130,7 +141,7 @@ class ReceivedMessageProcessor @Inject constructor(
)
) {
log { "Dropping message from hidden contact ${threadAddress.debugString}" }
return@withLock
return@withThreadLock
}

// Get or create thread ID, if we aren't allowed to create it, and it doesn't exist, drop the message
Expand All @@ -142,7 +153,7 @@ class ReceivedMessageProcessor @Inject constructor(
.also { id ->
if (id == -1L) {
log { "Dropping message for non-existing thread ${threadAddress.debugString}" }
return@withLock
return@withThreadLock
} else {
context.threadIDs[threadAddress] = id
}
Expand Down Expand Up @@ -201,23 +212,64 @@ class ReceivedMessageProcessor @Inject constructor(

fun processCommunityInboxMessage(
context: MessageProcessingContext,
communityServerUrl: String,
communityServerPubKeyHex: String,
message: OpenGroupApi.DirectMessage
) {
//TODO("Waiting for the implementation from libsession_util")
val (message, proto) = messageParser.parseCommunityDirectMessage(
msg = message,
currentUserId = context.currentUserId,
currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data,
currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl),
communityServerPubKeyHex = communityServerPubKeyHex,
)

val threadAddress = message.senderOrSync.toAddress() as Address.Conversable

withThreadLock(threadAddress) {
processSwarmMessage(
context = context,
threadAddress = threadAddress,
message = message,
proto = proto
)
}
}

fun processCommunityOutboxMessage(
context: MessageProcessingContext,
message: OpenGroupApi.DirectMessage
communityServerUrl: String,
communityServerPubKeyHex: String,
msg: OpenGroupApi.DirectMessage
) {
//TODO("Waiting for the implementation from libsession_util")
val (message, proto) = messageParser.parseCommunityDirectMessage(
msg = msg,
currentUserId = context.currentUserId,
currentUserEd25519PrivKey = context.currentUserEd25519KeyPair.secretKey.data,
currentUserBlindedIDs = context.getCurrentUserBlindedIDsByServer(communityServerUrl),
communityServerPubKeyHex = communityServerPubKeyHex,
)

val threadAddress = Address.CommunityBlindedId(
serverUrl = communityServerUrl,
blindedId = Address.Blinded(AccountId(msg.recipient))
)

withThreadLock(threadAddress) {
processSwarmMessage(
context = context,
threadAddress = threadAddress,
message = message,
proto = proto
)
}
}

fun processCommunityMessage(
context: MessageProcessingContext,
threadAddress: Address.Community,
message: OpenGroupApi.Message,
) = threadMutexes.getOrPut(threadAddress) { ReentrantLock() }.withLock {
) = withThreadLock(threadAddress) {
var messageId = messageParser.parseCommunityMessage(
msg = message,
currentUserId = context.currentUserId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ class OpenGroupPoller @AssistedInject constructor(
if (messages.isEmpty()) return
val sorted = messages.sortedBy { it.postedAt }

val serverPubKeyHex = storage.getOpenGroupPublicKey(server)
?: run {
Log.e(TAG, "No community server public key cannot process inbox messages")
return
}

receivedMessageProcessor.startProcessing("CommunityInbox") { ctx ->
for (apiMessage in sorted) {
try {
Expand All @@ -340,6 +346,8 @@ class OpenGroupPoller @AssistedInject constructor(
receivedMessageProcessor.processCommunityInboxMessage(
context = ctx,
message = apiMessage,
communityServerUrl = server,
communityServerPubKeyHex = serverPubKeyHex,
)

} catch (e: Exception) {
Expand All @@ -358,18 +366,26 @@ class OpenGroupPoller @AssistedInject constructor(
if (messages.isEmpty()) return
val sorted = messages.sortedBy { it.postedAt }

val serverPubKeyHex = storage.getOpenGroupPublicKey(server)
?: run {
Log.e(TAG, "No community server public key cannot process inbox messages")
return
}

receivedMessageProcessor.startProcessing("CommunityOutbox") { ctx ->
for (apiMessage in sorted) {
try {
storage.setLastOutboxMessageId(server, sorted.last().id)

receivedMessageProcessor.processCommunityOutboxMessage(
context = ctx,
message = apiMessage,
msg = apiMessage,
communityServerUrl = server,
communityServerPubKeyHex = serverPubKeyHex,
)

} catch (e: Exception) {
Log.e(TAG, "Error processing inbox message", e)
Log.e(TAG, "Error processing outbox message", e)
}
}
}
Expand Down