Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.thoughtcrime.securesms.dependencies.ConfigFactory
import org.thoughtcrime.securesms.mms.MediaConstraints
import org.thoughtcrime.securesms.pro.ProStatusManager
import org.thoughtcrime.securesms.pro.ProDataState
import org.thoughtcrime.securesms.pro.ProDetailsRepository
import org.thoughtcrime.securesms.pro.getDefaultSubscriptionStateData
import org.thoughtcrime.securesms.reviews.InAppReviewManager
import org.thoughtcrime.securesms.ui.SimpleDialogData
Expand Down Expand Up @@ -82,6 +83,7 @@ class SettingsViewModel @Inject constructor(
private val inAppReviewManager: InAppReviewManager,
private val avatarUploadManager: AvatarUploadManager,
private val attachmentProcessor: AttachmentProcessor,
val proDetailsRepository: ProDetailsRepository,
) : ViewModel() {
private val TAG = "SettingsViewModel"

Expand Down Expand Up @@ -153,6 +155,8 @@ class SettingsViewModel @Inject constructor(
_uiState.update { it.copy(avatarData = data) }
}
}

proDetailsRepository.requestRefresh()
}

private fun getVersionNumber(): CharSequence {
Expand Down
21 changes: 21 additions & 0 deletions app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.thoughtcrime.securesms.pro

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.onStart
import org.session.libsession.utilities.TextSecurePreferences
import org.thoughtcrime.securesms.util.castAwayType

/**
* Creates a flow that only emits when the debug flag forcePostPro is enabled.
*/
fun <T> TextSecurePreferences.flowPostProLaunch(flowFactory: () -> Flow<T>): Flow<T> {
@Suppress("OPT_IN_USAGE")
return TextSecurePreferences.events
.filter { it == TextSecurePreferences.SET_FORCE_POST_PRO }
.castAwayType()
.onStart { emit(Unit) }
.filter { forcePostPro() }
.flatMapLatest { flowFactory() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.thoughtcrime.securesms.pro

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.selects.onTimeout
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.auth.LoginStateRepository
import org.thoughtcrime.securesms.dependencies.ManagerScope
import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest
import org.thoughtcrime.securesms.pro.api.ProApiExecutor
import org.thoughtcrime.securesms.pro.api.ProDetails
import org.thoughtcrime.securesms.pro.api.successOrThrow
import org.thoughtcrime.securesms.pro.db.ProDatabase
import org.thoughtcrime.securesms.util.NetworkConnectivity
import java.time.Duration
import java.time.Instant
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.coroutines.cancellation.CancellationException

@Singleton
class ProDetailsRepository @Inject constructor(
private val db: ProDatabase,
private val apiExecutor: ProApiExecutor,
private val getProDetailsRequestFactory: GetProDetailsRequest.Factory,
private val loginStateRepository: LoginStateRepository,
private val prefs: TextSecurePreferences,
networkConnectivity: NetworkConnectivity,
@ManagerScope scope: CoroutineScope,
) {
sealed interface LoadState {
val lastUpdated: Pair<ProDetails, Instant>?

data object Init : LoadState {
override val lastUpdated: Pair<ProDetails, Instant>?
get() = null
}

data class Loading(
override val lastUpdated: Pair<ProDetails, Instant>?,
val waitingForNetwork: Boolean
) : LoadState

data class Loaded(override val lastUpdated: Pair<ProDetails, Instant>) : LoadState
data class Error(override val lastUpdated: Pair<ProDetails, Instant>?) : LoadState
}

private val refreshRequests: SendChannel<Unit>

val loadState: StateFlow<LoadState>

init {
val channel = Channel<Unit>(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)

refreshRequests = channel
@Suppress("OPT_IN_USAGE")
loadState = prefs.flowPostProLaunch {
loginStateRepository.loggedInState
.mapNotNull { it?.seeded?.proMasterPrivateKey }
}.distinctUntilChanged()
.flatMapLatest { proMasterKey ->
flow {
var last = db.getProDetailsAndLastUpdated()
var numRetried = 0

while (true) {
// Drain all pending requests as we are about to execute a request
while (channel.tryReceive().isSuccess) { }

var retryingAt: Instant? = null

if (last != null && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) >= Instant.now()) {
Log.d(TAG, "Pro details is fresh enough, skipping fetch")
// Last update was recent enough, skip fetching
emit(LoadState.Loaded(last))
} else {
if (!networkConnectivity.networkAvailable.value) {
// No network...mark the state and wait for it to come back
emit(LoadState.Loading(last, waitingForNetwork = true))
networkConnectivity.networkAvailable.first { it }
}

emit(LoadState.Loading(last, waitingForNetwork = false))

// Fetch new details
try {
Log.d(TAG, "Start fetching Pro details from backend")
last = apiExecutor.executeRequest(
request = getProDetailsRequestFactory.create(proMasterKey)
).successOrThrow() to Instant.now()

db.updateProDetails(last.first, last.second)

Log.d(TAG, "Successfully fetched Pro details from backend")
emit(LoadState.Loaded(last))
numRetried = 0
} catch (e: Exception) {
if (e is CancellationException) throw e

emit(LoadState.Error(last))

// Exponential backoff for retries, capped at 2 minutes
val delaySeconds = minOf(10L * (1L shl numRetried), 120L)
Log.e(TAG, "Error fetching Pro details from backend, retrying in ${delaySeconds}s", e)

retryingAt = Instant.now().plusSeconds(delaySeconds)
numRetried++
}
}


// Wait until either a refresh is requested, or it's time to retry
select {
refreshRequests.onReceiveCatching {
Log.d(TAG, "Manual refresh requested")
}

if (retryingAt != null) {
val delayMillis =
Duration.between(Instant.now(), retryingAt).toMillis()
onTimeout(delayMillis) {
Log.d(TAG, "Retrying Pro details fetch after delay")
}
}
}
}
}
}.stateIn(scope, SharingStarted.Eagerly, LoadState.Init)
}

fun requestRefresh() {
refreshRequests.trySend(Unit)
}

companion object {
private const val TAG = "ProDetailsRepository"
private const val MIN_UPDATE_INTERVAL_SECONDS = 120L
}
}
21 changes: 14 additions & 7 deletions app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.auth.LoginStateRepository
import org.thoughtcrime.securesms.dependencies.ManagerScope
Expand All @@ -28,6 +32,7 @@ import org.thoughtcrime.securesms.pro.api.ProDetails
import org.thoughtcrime.securesms.pro.api.successOrThrow
import org.thoughtcrime.securesms.pro.db.ProDatabase
import org.thoughtcrime.securesms.util.NetworkConnectivity
import org.thoughtcrime.securesms.util.castAwayType
import java.time.Duration
import java.time.Instant
import javax.inject.Inject
Expand All @@ -45,6 +50,8 @@ class ProStatePoller @Inject constructor(
private val proDatabase: ProDatabase,
private val snodeClock: SnodeClock,
private val apiExecutor: ProApiExecutor,
private val proDetailsRepository: ProDetailsRepository,
prefs: TextSecurePreferences,
@ManagerScope scope: CoroutineScope,
): OnAppStartupComponent {
private val manualPollRequest = Channel<PollToken>()
Expand All @@ -56,9 +63,11 @@ class ProStatePoller @Inject constructor(
}

@OptIn(ExperimentalCoroutinesApi::class)
val pollState: StateFlow<PollState> = loginStateRepository
.loggedInState
.map { it?.seeded?.proMasterPrivateKey }
val pollState: StateFlow<PollState> = prefs.flowPostProLaunch {
loginStateRepository
.loggedInState
.map { it?.seeded?.proMasterPrivateKey }
}
.distinctUntilChanged()
.flatMapLatest { proMasterPrivateKey ->
if (proMasterPrivateKey == null) {
Expand Down Expand Up @@ -148,11 +157,9 @@ class ProStatePoller @Inject constructor(

if (currentProof == null || currentProof.expiryMs <= snodeClock.currentTimeMills()) {
// Current proof is missing or expired, grab the pro details to decide what to do next
val details = apiExecutor.executeRequest(
request = getProDetailsRequestFactory.create(masterPrivateKey = proMasterPrivateKey)
).successOrThrow()
proDetailsRepository.requestRefresh()

proDatabase.updateProDetails(details, Instant.now())
val details = proDetailsRepository.loadState.mapNotNull { it.lastUpdated }.first().first

val newProof = if (details.status == ProDetails.DETAILS_STATUS_ACTIVE) {
Log.d(TAG, "User is active Pro but has no valid proof, generating new proof")
Expand Down