Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -58,6 +59,7 @@ import org.thoughtcrime.securesms.util.castAwayType
import java.util.EnumSet
import java.util.concurrent.TimeUnit
import javax.inject.Inject
import kotlin.math.log

private const val TAG = "ConfigToDatabaseSync"

Expand Down Expand Up @@ -93,17 +95,20 @@ class ConfigToDatabaseSync @Inject constructor(
// Sync conversations from config -> database
scope.launch {
preferences.watchLocalNumber()
.filterNotNull()
.take(1)
.flatMapLatest {
combine(
conversationRepository.conversationListAddressesFlow,
configFactory.userConfigsChanged(EnumSet.of(UserConfigType.CONVO_INFO_VOLATILE))
.castAwayType()
.onStart { emit(Unit) }
.map { _ -> configFactory.withUserConfigs { it.convoInfoVolatile.all() } },
::Pair
)
.map { it != null }
.flatMapLatest { loggedIn ->
if (loggedIn) {
combine(
conversationRepository.conversationListAddressesFlow,
configFactory.userConfigsChanged(EnumSet.of(UserConfigType.CONVO_INFO_VOLATILE))
.castAwayType()
.onStart { emit(Unit) }
.map { _ -> configFactory.withUserConfigs { it.convoInfoVolatile.all() } },
::Pair
)
} else {
emptyFlow()
}
}
.distinctUntilChanged()
.collectLatest { (conversations, convoInfo) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
Expand Down Expand Up @@ -43,21 +43,24 @@ class BlindMappingRepository @Inject constructor(
*/
@Suppress("OPT_IN_USAGE")
val mappings: StateFlow<Map<CommunityServerUrl, Map<Address.Blinded, Address.Standard>>> = prefs.watchLocalNumber()
.filterNotNull()
.flatMapLatest { localAddress ->
configFactory
.userConfigsChanged(setOf(UserConfigType.USER_GROUPS, UserConfigType.CONTACTS))
.castAwayType()
.onStart { emit(Unit) }
.map {
configFactory.withUserConfigs { configs ->
Pair(
configs.userGroups.allCommunityInfo().map { it.community },
configs.contacts.all().map { Address.Standard(AccountId(it.id)) }
+ Address.Standard(AccountId(localAddress))
)
if (localAddress.isNullOrBlank()) {
emptyFlow()
} else {
configFactory
.userConfigsChanged(setOf(UserConfigType.USER_GROUPS, UserConfigType.CONTACTS))
.castAwayType()
.onStart { emit(Unit) }
.map {
configFactory.withUserConfigs { configs ->
Pair(
configs.userGroups.allCommunityInfo().map { it.community },
configs.contacts.all().map { Address.Standard(AccountId(it.id)) }
+ Address.Standard(AccountId(localAddress))
)
}
}
}
}
}
.distinctUntilChanged()
.map { (allCommunities, allContacts) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNotNull
Expand Down Expand Up @@ -93,9 +94,13 @@ class RecipientRepository @Inject constructor(

fun observeSelf(): Flow<Recipient> {
return preferences.watchLocalNumber()
.filterNotNull()
.distinctUntilChanged()
.flatMapLatest { observeRecipient(it.toAddress()) }
.flatMapLatest {
if (it.isNullOrBlank()) {
emptyFlow()
} else {
observeRecipient(it.toAddress())
}
}
}

fun getSelf(): Recipient {
Expand Down