Skip to content

Commit

Permalink
perf: schedule updating conversation read status [WPB-9216] (#2865)
Browse files Browse the repository at this point in the history
* perf: schedule updating conversation read status

Instead of just doing it immediately when called, schedule it to be executed using the new ParallelConversationWorkQueue, that allows one work to be executed at a time per conversation.

Also, add a little debounce to the queue, in case the user is scrolling down through the messages.

* refactor: use constructor Function instead of Impl class
  • Loading branch information
vitorhugods committed Jul 8, 2024
1 parent 26d21ef commit a89ab59
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 42 deletions.
2 changes: 0 additions & 2 deletions detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,6 @@
<ID>NoUnusedImports:SearchRepositoryArrangement.kt$com.wire.kalium.logic.util.arrangement.repository.SearchRepositoryArrangement.kt</ID>
<ID>NoUnusedImports:SearchUseCaseTest.kt$com.wire.kalium.logic.feature.search.SearchUseCaseTest.kt</ID>
<ID>NoUnusedImports:SendButtonActionMessageTest.kt$com.wire.kalium.logic.feature.message.SendButtonActionMessageTest.kt</ID>
<ID>NoUnusedImports:SendConfirmationUseCaseTest.kt$com.wire.kalium.logic.feature.message.SendConfirmationUseCaseTest.kt</ID>
<ID>NoUnusedImports:SendConfirmationUseCaseTest.kt$import io.mockative.coEvery</ID>
<ID>NoUnusedImports:ServerConfigDTOJson.kt$util.ServerConfigDTOJson.kt</ID>
<ID>NoUnusedImports:ServerConfigRepositoryArrangement.kt$com.wire.kalium.logic.util.arrangement.repository.ServerConfigRepositoryArrangement.kt</ID>
<ID>NoUnusedImports:SessionEstablisherTest.kt$com.wire.kalium.logic.feature.message.SessionEstablisherTest.kt</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.util.DelicateKaliumApi

internal interface SelfConversationIdProvider {
internal fun interface SelfConversationIdProvider {
suspend operator fun invoke(): Either<StorageFailure, List<ConversationId>>
}

internal interface ProteusSelfConversationIdProvider {
internal fun interface ProteusSelfConversationIdProvider {
suspend operator fun invoke(): Either<StorageFailure, ConversationId>
}

internal interface MLSSelfConversationIdProvider {
internal fun interface MLSSelfConversationIdProvider {
suspend operator fun invoke(): Either<StorageFailure, ConversationId>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ import com.wire.kalium.logic.feature.conversation.messagetimer.UpdateMessageTime
import com.wire.kalium.logic.feature.conversation.messagetimer.UpdateMessageTimerUseCaseImpl
import com.wire.kalium.logic.feature.conversation.mls.OneOnOneResolver
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.feature.message.SendConfirmationUseCase
import com.wire.kalium.logic.feature.message.receipt.SendConfirmationUseCase
import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessagesAfterEndDateUseCase
import com.wire.kalium.logic.feature.message.receipt.ConversationWorkQueue
import com.wire.kalium.logic.feature.message.receipt.ParallelConversationWorkQueue
import com.wire.kalium.logic.feature.publicuser.RefreshUsersWithoutMetadataUseCase
import com.wire.kalium.logic.feature.team.DeleteTeamConversationUseCase
import com.wire.kalium.logic.feature.team.DeleteTeamConversationUseCaseImpl
import com.wire.kalium.logic.sync.SyncManager
import com.wire.kalium.logic.sync.receiver.conversation.RenamedConversationEventHandler
import com.wire.kalium.logic.sync.receiver.handler.CodeUpdateHandlerImpl
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.CoroutineScope

@Suppress("LongParameterList")
Expand Down Expand Up @@ -195,6 +198,10 @@ class ConversationScope internal constructor(
val markConnectionRequestAsNotified: MarkConnectionRequestAsNotifiedUseCase
get() = MarkConnectionRequestAsNotifiedUseCaseImpl(connectionRepository)

private val conversationWorkQueue: ConversationWorkQueue by lazy {
ParallelConversationWorkQueue(scope, kaliumLogger, KaliumDispatcherImpl.default)
}

val updateConversationReadDateUseCase: UpdateConversationReadDateUseCase
get() = UpdateConversationReadDateUseCase(
conversationRepository,
Expand All @@ -203,7 +210,7 @@ class ConversationScope internal constructor(
selfUserId,
selfConversationIdProvider,
sendConfirmation,
scope,
conversationWorkQueue,
kaliumLogger
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.feature.message.SendConfirmationUseCase
import com.wire.kalium.logic.feature.message.receipt.ConversationTimeEventInput
import com.wire.kalium.logic.feature.message.receipt.ConversationTimeEventWorker
import com.wire.kalium.logic.feature.message.receipt.ConversationWorkQueue
import com.wire.kalium.logic.feature.message.receipt.SendConfirmationUseCase
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
Expand All @@ -58,7 +61,7 @@ class UpdateConversationReadDateUseCase internal constructor(
private val selfUserId: UserId,
private val selfConversationIdProvider: SelfConversationIdProvider,
private val sendConfirmation: SendConfirmationUseCase,
private val scope: CoroutineScope,
private val workQueue: ConversationWorkQueue,
private val logger: KaliumLogger = kaliumLogger
) {

Expand All @@ -67,12 +70,20 @@ class UpdateConversationReadDateUseCase internal constructor(
* @param time The last read date to update.
*/
operator fun invoke(conversationId: QualifiedID, time: Instant) {
scope.launch {
workQueue.enqueue(ConversationTimeEventInput(conversationId, time), worker)
}

private val worker = ConversationTimeEventWorker { (conversationId, time) ->
coroutineScope {
conversationRepository.observeCacheDetailsById(conversationId).flatMap {
it.first()?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}.onFailure {
logger.w("Failed to update conversation read date; StorageFailure $it")
}.onSuccess { conversation ->
if (conversation.lastReadDate >= time) {
// Skipping, as current lastRead is already newer than the scheduled one
return@onSuccess
}
launch {
sendConfirmation(conversationId, conversation.lastReadDate, time)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import com.wire.kalium.logic.feature.message.ephemeral.EnqueueMessageSelfDeletio
import com.wire.kalium.logic.feature.message.ephemeral.EnqueueMessageSelfDeletionUseCaseImpl
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandler
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandlerImpl
import com.wire.kalium.logic.feature.message.receipt.SendConfirmationUseCase
import com.wire.kalium.logic.feature.selfDeletingMessages.ObserveSelfDeletionTimerSettingsForConversationUseCase
import com.wire.kalium.logic.feature.sessionreset.ResetSessionUseCase
import com.wire.kalium.logic.feature.sessionreset.ResetSessionUseCaseImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import com.wire.kalium.logic.data.id.ConversationId
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.time.Duration.Companion.seconds

/**
* A work queue that schedules and runs actions for conversations based on a time
*/
internal interface ConversationWorkQueue {
internal fun interface ConversationWorkQueue {
/**
* Enqueue [worker] to perform on the provided [input], to be executed according to the implementation.
*/
suspend fun enqueue(input: ConversationTimeEventInput, worker: ConversationTimeEventWorker)
fun enqueue(input: ConversationTimeEventInput, worker: ConversationTimeEventWorker)
}

/**
Expand Down Expand Up @@ -69,16 +71,26 @@ internal class ParallelConversationWorkQueue(
* If there's an existing work being done for the same [ConversationTimeEventInput.conversationId], it will wait until it's over.
* After that, will start working as soon as the [dispatcher] allows it, while the [scope] is alive.
*/
override suspend fun enqueue(
override fun enqueue(
input: ConversationTimeEventInput,
worker: ConversationTimeEventWorker
): Unit = mutex.withLock {
) {
scope.launch {
mutex.withLock {
enqueueWork(input, worker)
}
}
}

private fun ParallelConversationWorkQueue.enqueueWork(
input: ConversationTimeEventInput,
worker: ConversationTimeEventWorker
): Any {
val conversationId = input.conversationId
val time = input.eventTime
logger.v("Attempting to enqueue receipt work for conversation '${conversationId.toLogString()}', with time '$time'")
val work = ConversationTimeEventWork(input, worker)
val existingConversationQueue = conversationQueueMap[conversationId]
existingConversationQueue?.let {
return existingConversationQueue?.let {
val isNewWorkMoreRecent = time > it.value.conversationTimeEventInput.eventTime
if (isNewWorkMoreRecent) {
it.value = work
Expand All @@ -88,11 +100,11 @@ internal class ParallelConversationWorkQueue(
}
}

private suspend fun createQueueForConversation(initialWork: ConversationTimeEventWork) =
MutableStateFlow(initialWork).also {
private fun createQueueForConversation(initialWork: ConversationTimeEventWork) =
MutableStateFlow(initialWork).also { flow ->
scope.launch(dispatcher) {
@Suppress("TooGenericExceptionCaught")
it.collect { (conversationTimeEventInput, worker) ->
flow.debounce(QUEUE_DEBOUNCE).collect { (conversationTimeEventInput, worker) ->
try {
worker.doWork(conversationTimeEventInput)
} catch (t: Throwable) {
Expand All @@ -101,4 +113,8 @@ internal class ParallelConversationWorkQueue(
}
}
}

private companion object {
private val QUEUE_DEBOUNCE = 3.seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.message
package com.wire.kalium.logic.feature.message.receipt

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogger
Expand All @@ -29,9 +29,11 @@ import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.logic.data.message.MessageTarget
import com.wire.kalium.logic.data.message.receipt.ReceiptType
import com.wire.kalium.logic.data.properties.UserPropertyRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
Expand All @@ -51,26 +53,32 @@ import kotlinx.serialization.json.Json
* - For 1:1 we take into consideration [UserPropertyRepository.getReadReceiptsStatus]
* - For group conversations we have to look for each group conversation configuration.
*/
@Suppress("LongParameterList")
internal class SendConfirmationUseCase internal constructor(
private val currentClientIdProvider: CurrentClientIdProvider,
private val syncManager: SyncManager,
private val messageSender: MessageSender,
private val selfUserId: UserId,
private val conversationRepository: ConversationRepository,
private val messageRepository: MessageRepository,
private val userPropertyRepository: UserPropertyRepository,
) {
private companion object {
const val TAG = "SendConfirmation"
const val conversationIdKey = "conversationId"
const val messageIdsKey = "messageIds"
const val statusKey = "status"
}
internal interface SendConfirmationUseCase {
suspend operator fun invoke(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant
): Either<CoreFailure, Unit>
}

@Suppress("LongParameterList", "FunctionNaming")
internal fun SendConfirmationUseCase(
currentClientIdProvider: CurrentClientIdProvider,
syncManager: SyncManager,
messageSender: MessageSender,
selfUserId: UserId,
conversationRepository: ConversationRepository,
messageRepository: MessageRepository,
userPropertyRepository: UserPropertyRepository,
) = object : SendConfirmationUseCase {
val TAG = "SendConfirmation"
val conversationIdKey = "conversationId"
val messageIdsKey = "messageIds"
val statusKey = "status"

private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.MESSAGES) }

suspend operator fun invoke(
override suspend operator fun invoke(
conversationId: ConversationId,
afterDateTime: Instant,
untilDateTime: Instant
Expand Down Expand Up @@ -100,7 +108,7 @@ internal class SendConfirmationUseCase internal constructor(
expirationData = null
)

messageSender.sendMessage(message)
messageSender.sendMessage(message, messageTarget = MessageTarget.Conversation(setOf(selfUserId)))
}.onFailure {
logConfirmationError(conversationId, messageIds, it)
}.onSuccess {
Expand Down Expand Up @@ -170,6 +178,6 @@ internal class SendConfirmationUseCase internal constructor(
val jsonLogString = "${properties.toJsonElement()}"
val logMessage = "$TAG: $jsonLogString"

logger.i("$logMessage")
logger.i(logMessage)
}
}
Loading

0 comments on commit a89ab59

Please sign in to comment.