From 56ae502d70c98d9e01bf12ceff245b6d4ef3385b Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 21 Nov 2025 09:24:52 +1100 Subject: [PATCH 1/3] Updated pro polling logic --- .../dependencies/OnAppStartupComponents.kt | 6 +- .../preferences/SettingsViewModel.kt | 4 +- .../securesms/pro/ProDetailsRepository.kt | 57 ++++- .../thoughtcrime/securesms/pro/ProProofs.kt | 1 + .../securesms/pro/ProStatePoller.kt | 228 ------------------ .../securesms/pro/ProStatusManager.kt | 52 ++++ .../pro/RevocationListPollingWorker.kt | 85 +++++++ 7 files changed, 191 insertions(+), 242 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt index c0b136e603..ef8a1b8c84 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt @@ -20,7 +20,7 @@ import org.thoughtcrime.securesms.logging.PersistentLogger import org.thoughtcrime.securesms.migration.DatabaseMigrationManager import org.thoughtcrime.securesms.notifications.BackgroundPollManager import org.thoughtcrime.securesms.notifications.PushRegistrationHandler -import org.thoughtcrime.securesms.pro.ProStatePoller +import org.thoughtcrime.securesms.pro.RevocationListPoller import org.thoughtcrime.securesms.pro.ProStatusManager import org.thoughtcrime.securesms.pro.subscription.SubscriptionCoordinator import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager @@ -71,7 +71,7 @@ class OnAppStartupComponents private constructor( avatarUploadManager: AvatarUploadManager, configToDatabaseSync: ConfigToDatabaseSync, subscriptionManagers: Set<@JvmSuppressWildcards SubscriptionManager>, - proStatePoller: ProStatePoller, + revocationListPoller: RevocationListPoller, ): this( components = listOf( configUploader, @@ -103,7 +103,7 @@ class OnAppStartupComponents private constructor( subscriptionCoordinator, avatarUploadManager, configToDatabaseSync, - proStatePoller, + revocationListPoller, ) + subscriptionManagers ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt index ab9cce4dfc..346526d59b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt @@ -156,7 +156,9 @@ class SettingsViewModel @Inject constructor( } } - proDetailsRepository.requestRefresh() + viewModelScope.launch { + proDetailsRepository.requestRefresh() + } } private fun getVersionNumber(): CharSequence { diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt index a901b75a62..3c3e6584f3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt @@ -30,13 +30,15 @@ import javax.inject.Inject import javax.inject.Singleton import kotlin.coroutines.cancellation.CancellationException +typealias ForceRefresh = Boolean + @Singleton class ProDetailsRepository @Inject constructor( private val db: ProDatabase, private val apiExecutor: ProApiExecutor, private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, private val loginStateRepository: LoginStateRepository, - private val prefs: TextSecurePreferences, + prefs: TextSecurePreferences, networkConnectivity: NetworkConnectivity, @ManagerScope scope: CoroutineScope, ) { @@ -57,12 +59,12 @@ class ProDetailsRepository @Inject constructor( data class Error(override val lastUpdated: Pair?) : LoadState } - private val refreshRequests: SendChannel + private val refreshRequests: SendChannel val loadState: StateFlow init { - val channel = Channel(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST) + val channel = Channel() refreshRequests = channel @Suppress("OPT_IN_USAGE") @@ -74,22 +76,44 @@ class ProDetailsRepository @Inject constructor( flow { var last = db.getProDetailsAndLastUpdated() var numRetried = 0 + var forceRefresh = false while (true) { // Drain all pending requests as we are about to execute a request - while (channel.tryReceive().isSuccess) { } + while (true) { + val result = channel.tryReceive() + when { + result.isClosed -> { + Log.w(TAG, "Refresh channel closed, stopping Pro details fetcher") + return@flow + } + + result.isSuccess -> { + forceRefresh = forceRefresh || result.getOrThrow() + } + + else -> break + } + } var retryingAt: Instant? = null - if (last != null && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) >= Instant.now()) { + if (!forceRefresh && last != null + && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) >= Instant.now()) { Log.d(TAG, "Pro details is fresh enough, skipping fetch") // Last update was recent enough, skip fetching emit(LoadState.Loaded(last)) } else { if (!networkConnectivity.networkAvailable.value) { - // No network...mark the state and wait for it to come back + // No network...mark the state and wait for the network to be online emit(LoadState.Loading(last, waitingForNetwork = true)) + networkConnectivity.networkAvailable.first { it } + + // We might have waited a while for the network to come back + // so drain the refresh requests again to avoid blocking the requesters + // for too long + while (channel.tryReceive().isSuccess) {} } emit(LoadState.Loading(last, waitingForNetwork = false)) @@ -118,13 +142,16 @@ class ProDetailsRepository @Inject constructor( retryingAt = Instant.now().plusSeconds(delaySeconds) numRetried++ } + + forceRefresh = false } // Wait until either a refresh is requested, or it's time to retry select { - refreshRequests.onReceiveCatching { - Log.d(TAG, "Manual refresh requested") + refreshRequests.onReceive { + Log.d(TAG, "Manual refresh requested: force = $it") + forceRefresh = it } if (retryingAt != null) { @@ -140,8 +167,18 @@ class ProDetailsRepository @Inject constructor( }.stateIn(scope, SharingStarted.Eagerly, LoadState.Init) } - fun requestRefresh() { - refreshRequests.trySend(Unit) + /** + * Requests a fresh of current user's pro details. By default, if last update is recent enough, + * no network request will be made. If [force] is true, a network request will be + * made regardless of the freshness of the last update. + */ + suspend fun requestRefresh(force: Boolean = false) { + if ((loadState.value as? LoadState.Loading)?.waitingForNetwork == true) { + Log.d(TAG, "Currently waiting for network for a fetch, no need to send another request") + return + } + + refreshRequests.send(force) } companion object { diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt index 1d13b218f4..fc9af41c44 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt @@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.pro import com.google.protobuf.ByteString import network.loki.messenger.libsession_util.pro.ProProof import org.session.libsignal.protos.SignalServiceProtos +import java.time.Instant /** * Copies values from a libsession ProProof into a protobuf-based ProProof. diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt deleted file mode 100644 index 7fcee40396..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt +++ /dev/null @@ -1,228 +0,0 @@ -package org.thoughtcrime.securesms.pro - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.selects.onTimeout -import kotlinx.coroutines.selects.select -import org.session.libsession.snode.SnodeClock -import org.session.libsession.utilities.TextSecurePreferences -import org.session.libsignal.utilities.Log -import org.thoughtcrime.securesms.auth.LoginStateRepository -import org.thoughtcrime.securesms.dependencies.ManagerScope -import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent -import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest -import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest -import org.thoughtcrime.securesms.pro.api.GetProRevocationRequest -import org.thoughtcrime.securesms.pro.api.ProApiExecutor -import org.thoughtcrime.securesms.pro.api.ProDetails -import org.thoughtcrime.securesms.pro.api.successOrThrow -import org.thoughtcrime.securesms.pro.db.ProDatabase -import org.thoughtcrime.securesms.util.NetworkConnectivity -import org.thoughtcrime.securesms.util.castAwayType -import java.time.Duration -import java.time.Instant -import javax.inject.Inject -import javax.inject.Singleton - -typealias PollToken = Channel> - -@Singleton -class ProStatePoller @Inject constructor( - loginStateRepository: LoginStateRepository, - private val connectivity: NetworkConnectivity, - private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, - private val generateProProofRequest: GenerateProProofRequest.Factory, - private val getProRevocationRequestFactory: GetProRevocationRequest.Factory, - private val proDatabase: ProDatabase, - private val snodeClock: SnodeClock, - private val apiExecutor: ProApiExecutor, - private val proDetailsRepository: ProDetailsRepository, - prefs: TextSecurePreferences, - @ManagerScope scope: CoroutineScope, -): OnAppStartupComponent { - private val manualPollRequest = Channel() - - enum class PollState { - Init, - Polling, - UpToDate, - } - - @OptIn(ExperimentalCoroutinesApi::class) - val pollState: StateFlow = prefs.flowPostProLaunch { - loginStateRepository - .loggedInState - .map { it?.seeded?.proMasterPrivateKey } - } - .distinctUntilChanged() - .flatMapLatest { proMasterPrivateKey -> - if (proMasterPrivateKey == null) { - return@flatMapLatest emptyFlow() - } - - flow { - var numRetried = 0 - var nextPoll: Instant? = null - val pollTokens = mutableListOf() - - while (true) { - // Wait for network to become available - connectivity.networkAvailable.first { it } - - if (nextPoll != null) { - val now = Instant.now() - if (now < nextPoll) { - val delayMillis = Duration.between(now, nextPoll).toMillis() - Log.d(TAG, "Delaying next poll for $delayMillis ms") - select { - onTimeout(delayMillis) {} - manualPollRequest.onReceiveCatching { - if (it.isSuccess) { - pollTokens.add(it.getOrThrow()) - } - Log.d(TAG, "Manual poll requested") - } - } - } - } - - // Drain any manual poll requests - while (true) { - val received = manualPollRequest.tryReceive() - if (received.isSuccess) { - pollTokens += received.getOrThrow() - } else { - break - } - } - - emit(PollState.Polling) - Log.d(TAG, "Start polling Pro state") - - val result = runCatching { - pollOnce(proMasterPrivateKey) - emit(PollState.UpToDate) - Log.d(TAG, "Pro state polled successful") - } - - pollTokens.forEach { it.trySend(result) } - - nextPoll = when { - result.isSuccess -> { - numRetried = 0 - Instant.now().plusSeconds(POLL_INTERVAL_MINUTES * 60) - } - - result.exceptionOrNull() is CancellationException -> { - throw result.exceptionOrNull()!! - } - - else -> { - numRetried++ - val delaySeconds = (POLL_RETRY_INTERVAL_MIN_SECONDS * numRetried * 1.2).toLong() - .coerceAtMost(POLL_INTERVAL_MINUTES * 60) - - Log.e(TAG, "Error polling pro state, retrying in $delaySeconds seconds", result.exceptionOrNull()) - - Instant.now().plusSeconds(delaySeconds) - } - } - } - } - } - .stateIn(scope, started = SharingStarted.Eagerly, initialValue = PollState.Init) - - suspend fun requestPollOnceAndWait() { - val channel = Channel>() - manualPollRequest.send(channel) - channel.receive().getOrThrow() - } - - private suspend fun pollOnce(proMasterPrivateKey: ByteArray) { - val currentProof = proDatabase.getCurrentProProof() - - if (currentProof == null || currentProof.expiryMs <= snodeClock.currentTimeMills()) { - // Current proof is missing or expired, grab the pro details to decide what to do next - proDetailsRepository.requestRefresh() - - val details = proDetailsRepository.loadState.mapNotNull { it.lastUpdated }.first().first - - val newProof = if (details.status == ProDetails.DETAILS_STATUS_ACTIVE) { - Log.d(TAG, "User is active Pro but has no valid proof, generating new proof") - apiExecutor.executeRequest( - request = generateProProofRequest.create( - masterPrivateKey = proMasterPrivateKey, - rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(snodeClock.currentTime()).ed25519PrivKey - ) - ).successOrThrow() - } else { - Log.d(TAG, "User is not active pro") - null - } - - Log.d(TAG, "Updating current pro proof to $newProof") - proDatabase.updateCurrentProProof(newProof) - } else { - // Current proof is still valid, we just need to check for revocation - val lastTicket = proDatabase.getLastRevocationTicket() - - val revocations = apiExecutor.executeRequest( - request = getProRevocationRequestFactory.create(lastTicket) - ).successOrThrow() - - proDatabase.updateRevocations( - newTicket = revocations.ticket, - data = revocations.items - ) - - if (proDatabase.isRevoked(currentProof.genIndexHashHex)) { - Log.d(TAG, "Current pro proof has been revoked, deleting") - proDatabase.updateCurrentProProof(null) - } else { - Log.d(TAG, "Current pro proof is still valid and not revoked") - } - } - } - - companion object { - private const val TAG = "ProStatePoller" - - - private const val POLL_INTERVAL_MINUTES = 1L - private const val POLL_RETRY_INTERVAL_MIN_SECONDS = 5L - -// fun schedule(context: Context) { -// WorkManager.getInstance(context) -// .enqueueUniquePeriodicWork( -// WORK_NAME, -// ExistingPeriodicWorkPolicy.KEEP, -// androidx.work.PeriodicWorkRequestBuilder( -// repeatInterval = Duration.ofMinutes(15) -// ).setConstraints( -// Constraints(requiredNetworkType = NetworkType.CONNECTED) -// ).setInitialDelay(5, TimeUnit.SECONDS) -// .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 10, TimeUnit.SECONDS) -// .build() -// ) -// } -// -// fun cancel(context: Context) { -// WorkManager.getInstance(context) -// .cancelUniqueWork(WORK_NAME) -// } - } -} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt index 8596c696af..cac1c1cda9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt @@ -8,9 +8,12 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.update @@ -35,8 +38,10 @@ import org.thoughtcrime.securesms.dependencies.ManagerScope import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent import org.thoughtcrime.securesms.pro.api.AddPaymentErrorStatus import org.thoughtcrime.securesms.pro.api.AddProPaymentRequest +import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest import org.thoughtcrime.securesms.pro.api.ProApiExecutor import org.thoughtcrime.securesms.pro.api.ProApiResponse +import org.thoughtcrime.securesms.pro.api.successOrThrow import org.thoughtcrime.securesms.pro.db.ProDatabase import org.thoughtcrime.securesms.pro.subscription.ProSubscriptionDuration import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager @@ -55,6 +60,8 @@ class ProStatusManager @Inject constructor( private val loginState: LoginStateRepository, private val proDatabase: ProDatabase, private val snodeClock: SnodeClock, + private val proDetailsRepository: ProDetailsRepository, + private val generateProProofRequest: GenerateProProofRequest.Factory, ) : OnAppStartupComponent { val proDataState: StateFlow = combine( @@ -157,6 +164,51 @@ class ProStatusManager @Inject constructor( _postProLaunchStatus.update { isPostPro() } } } + + // Manage pro proof based on + scope.launch { + combine( + loginState.loggedInState.mapNotNull { it?.seeded?.proMasterPrivateKey }, + proDetailsRepository.loadState + .filter { it != ProDetailsRepository.LoadState.Init } + .map { it.lastUpdated?.first?.expiry }, + ::Pair + ).collectLatest { (proMasterKey, proValidUntil) -> + val currentProof = proDatabase.getCurrentProProof() + + val now = snodeClock.currentTime() + if (proValidUntil != null && proValidUntil > now) { + if (currentProof != null) { + val proofExpiry = Instant.ofEpochMilli(currentProof.expiryMs) + + val renewProofAt = if (proofExpiry < now) { + if (Duration.between(now, proValidUntil).toMinutes() < 60) { + now + } else { + proValidUntil - Duration.ofMinutes(10 + (Math.random() * 50).toLong()) + } + } else { + now + } + + Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Scheduling pro proof renewal at $renewProofAt") + if (renewProofAt > now) { + delay(Duration.between(now, renewProofAt).toMillis()) + } + } + + Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Generating new pro proof") + try { + val proof = apiExecutor.executeRequest(request = generateProProofRequest.create( + masterPrivateKey = proMasterKey, + rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(now).ed25519PrivKey + )).successOrThrow() + } catch (e: Exception) { + TODO("Not yet implemented") + } + } + } + } } /** diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt new file mode 100644 index 0000000000..997d061fa5 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt @@ -0,0 +1,85 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingPeriodicWorkPolicy +import androidx.work.NetworkType +import androidx.work.PeriodicWorkRequestBuilder +import androidx.work.WorkManager +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.pro.api.GetProRevocationRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import java.time.Duration +import java.util.concurrent.TimeUnit +import kotlin.coroutines.cancellation.CancellationException + +@HiltWorker +class RevocationListPollingWorker @AssistedInject constructor( + @Assisted context: Context, + @Assisted params: WorkerParameters, + private val proDatabase: ProDatabase, + private val getProRevocationRequestFactory: GetProRevocationRequest.Factory, + private val proApiExecutor: ProApiExecutor, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + try { + val lastTicket = proDatabase.getLastRevocationTicket() + val response = proApiExecutor.executeRequest(request = getProRevocationRequestFactory.create(lastTicket)).successOrThrow() + proDatabase.updateRevocations( + data = response.items, + newTicket = response.ticket + ) + + proDatabase.pruneRevocations(snodeClock.currentTime()) + + return Result.success() + } catch (e: Exception) { + if (e is CancellationException) { + throw e + } + + Log.e(TAG, "Error polling revocation list", e) + return if (e is NonRetryableException) { + Result.failure() + } else { + Result.retry() + } + } + } + + companion object { + private const val TAG = "RevocationListPollingWorker" + + private const val WORK_NAME = "RevocationListPollingWorker" + + suspend fun schedule(context: Context) { + WorkManager.getInstance(context) + .enqueueUniquePeriodicWork(WORK_NAME, ExistingPeriodicWorkPolicy.KEEP, + PeriodicWorkRequestBuilder(Duration.ofMinutes(15)) + .setInitialDelay(0L, TimeUnit.MILLISECONDS) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .build() + ) + .await() + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(WORK_NAME) + .await() + } + } +} \ No newline at end of file From d80fadfd12fe4139a511005f76e5638e4fd68f2b Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 21 Nov 2025 12:38:04 +1100 Subject: [PATCH 2/3] Updated polling logic --- .../securesms/auth/LoginStateRepository.kt | 16 +- .../securesms/debugmenu/DebugLogger.kt | 4 +- .../dependencies/OnAppStartupComponents.kt | 3 - .../securesms/pro/FetchProDetailsWorker.kt | 158 ++++++++++++++++ .../org/thoughtcrime/securesms/pro/Flows.kt | 21 --- .../securesms/pro/ProDetailsRepository.kt | 173 +++++------------- .../securesms/pro/ProProofGenerationWorker.kt | 114 ++++++++++++ .../securesms/pro/ProStatusManager.kt | 114 +++++++----- .../securesms/pro/db/ProDatabase.kt | 27 ++- 9 files changed, 418 insertions(+), 212 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt index e69c8d1790..f8279dced4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt @@ -160,14 +160,16 @@ class LoginStateRepository @Inject constructor( * Runs the provided [block] suspend function while the user is logged in, and cancels it * when logged out. */ - suspend fun runWhileLoggedIn(block: suspend () -> Unit) { - loggedInState - .map { it != null } - .collectLatest { loggedIn -> - if (loggedIn) { - block() + fun runWhileLoggedIn(scope: CoroutineScope, block: suspend () -> Unit) { + scope.launch { + loggedInState + .map { it != null } + .collectLatest { loggedIn -> + if (loggedIn) { + block() + } } - } + } } fun clear() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt b/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt index 8197d585fd..5b4d3554ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt @@ -129,6 +129,6 @@ data class DebugLogData( enum class DebugLogGroup(val label: String, val color: Color){ AVATAR("Avatar", primaryOrange), - PRO_SUBSCRIPTION("Pro Subscription", primaryGreen), - PRO_DATA("Pro Data", primaryBlue) + PRO_SUBSCRIPTION("ProSubscription", primaryGreen), + PRO_DATA("ProData", primaryBlue) } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt index ef8a1b8c84..e1730e784e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt @@ -20,7 +20,6 @@ import org.thoughtcrime.securesms.logging.PersistentLogger import org.thoughtcrime.securesms.migration.DatabaseMigrationManager import org.thoughtcrime.securesms.notifications.BackgroundPollManager import org.thoughtcrime.securesms.notifications.PushRegistrationHandler -import org.thoughtcrime.securesms.pro.RevocationListPoller import org.thoughtcrime.securesms.pro.ProStatusManager import org.thoughtcrime.securesms.pro.subscription.SubscriptionCoordinator import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager @@ -71,7 +70,6 @@ class OnAppStartupComponents private constructor( avatarUploadManager: AvatarUploadManager, configToDatabaseSync: ConfigToDatabaseSync, subscriptionManagers: Set<@JvmSuppressWildcards SubscriptionManager>, - revocationListPoller: RevocationListPoller, ): this( components = listOf( configUploader, @@ -103,7 +101,6 @@ class OnAppStartupComponents private constructor( subscriptionCoordinator, avatarUploadManager, configToDatabaseSync, - revocationListPoller, ) + subscriptionManagers ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt new file mode 100644 index 0000000000..e8ccf0ae98 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt @@ -0,0 +1,158 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingWorkPolicy +import androidx.work.NetworkType +import androidx.work.OneTimeWorkRequestBuilder +import androidx.work.WorkInfo +import androidx.work.WorkManager +import androidx.work.WorkQuery +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.mapNotNull +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.auth.LoginStateRepository +import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.ProDetails +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import java.time.Duration + +/** + * A worker that fetches the user's Pro details from the server and updates the local database. + * + * This worker doesn't do any business logic in terms of when to schedule itself, it simply performs + * the fetch and update operation regardlessly. It, however, does schedule the [ProProofGenerationWorker] + * if needed based on the fetched Pro details, this is because the proof generation logic + * is tightly coupled to the fetched Pro details state. + */ +@HiltWorker +class FetchProDetailsWorker @AssistedInject constructor( + @Assisted private val context: Context, + @Assisted params: WorkerParameters, + private val apiExecutor: ProApiExecutor, + private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, + private val proDatabase: ProDatabase, + private val loginStateRepository: LoginStateRepository, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + val proMasterKey = + requireNotNull(loginStateRepository.peekLoginState()?.seeded?.proMasterPrivateKey) { + "User must be logged in to fetch pro details" + } + + return try { + Log.d(TAG, "Fetching Pro details from server") + val details = apiExecutor.executeRequest( + request = getProDetailsRequestFactory.create(proMasterKey) + ).successOrThrow() + + Log.d( + TAG, + "Fetched pro details, status = ${details.status}, expiry = ${details.expiry}" + ) + + proDatabase.updateProDetails(proDetails = details, updatedAt = snodeClock.currentTime()) + + scheduleProofGenerationIfNeeded(details) + + Result.success() + } catch (e: CancellationException) { + Log.d(TAG, "Work cancelled") + throw e + } catch (e: NonRetryableException) { + Log.e(TAG, "Non-retryable error fetching pro details", e) + Result.failure() + } catch (e: Exception) { + Log.e(TAG, "Error fetching pro details", e) + Result.retry() + } + } + + + private suspend fun scheduleProofGenerationIfNeeded(details: ProDetails) { + val now = snodeClock.currentTimeMills() + + if (details.status != ProDetails.DETAILS_STATUS_ACTIVE) { + Log.d(TAG, "Pro is not active, clearing proof") + ProProofGenerationWorker.cancel(context) + proDatabase.updateCurrentProProof(null) + } else { + val currentProof = proDatabase.getCurrentProProof() + + if (currentProof == null || currentProof.expiryMs <= now) { + Log.d( + TAG, + "Pro is active but no valid proof found, scheduling proof generation now" + ) + ProProofGenerationWorker.schedule(context) + } else if (currentProof.expiryMs - now <= Duration.ofMinutes(60).toMillis() && + details.expiry!!.toEpochMilli() - now > Duration.ofMinutes(60).toMillis() && + details.autoRenewing == true + ) { + val delay = Duration.ofMinutes((Math.random() * 50 + 10).toLong()) + Log.d(TAG, "Pro proof is expiring soon, scheduling proof generation in $delay") + ProProofGenerationWorker.schedule(context, delay) + } else { + Log.d( + TAG, + "Pro proof is still valid for a long period, no need to schedule proof generation" + ) + } + } + } + + companion object { + private const val TAG = "FetchProDetailsWorker" + + fun watch(context: Context): Flow { + val workQuery = WorkQuery.Builder + .fromUniqueWorkNames(listOf(TAG)) + .build() + + return WorkManager.getInstance(context) + .getWorkInfosFlow(workQuery) + .mapNotNull { it.firstOrNull() } + } + + fun schedule( + context: Context, + existingWorkPolicy: ExistingWorkPolicy, + delay: Duration? = null + ) { + WorkManager.getInstance(context) + .enqueueUniqueWork( + uniqueWorkName = TAG, + existingWorkPolicy = existingWorkPolicy, + request = OneTimeWorkRequestBuilder() + .apply { + if (delay != null) { + setInitialDelay(delay) + } + } + .addTag(TAG) + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .build() + ) + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(TAG) + .await() + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt deleted file mode 100644 index 2bc23da363..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt +++ /dev/null @@ -1,21 +0,0 @@ -package org.thoughtcrime.securesms.pro - -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.onStart -import org.session.libsession.utilities.TextSecurePreferences -import org.thoughtcrime.securesms.util.castAwayType - -/** - * Creates a flow that only emits when the debug flag forcePostPro is enabled. - */ -fun TextSecurePreferences.flowPostProLaunch(flowFactory: () -> Flow): Flow { - @Suppress("OPT_IN_USAGE") - return TextSecurePreferences.events - .filter { it == TextSecurePreferences.SET_FORCE_POST_PRO } - .castAwayType() - .onStart { emit(Unit) } - .filter { forcePostPro() } - .flatMapLatest { flowFactory() } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt index 3c3e6584f3..9b019ed715 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt @@ -1,45 +1,30 @@ package org.thoughtcrime.securesms.pro +import android.app.Application +import androidx.work.ExistingWorkPolicy +import androidx.work.WorkInfo import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.selects.select -import kotlinx.coroutines.selects.onTimeout -import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.snode.SnodeClock import org.session.libsignal.utilities.Log -import org.thoughtcrime.securesms.auth.LoginStateRepository import org.thoughtcrime.securesms.dependencies.ManagerScope -import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest -import org.thoughtcrime.securesms.pro.api.ProApiExecutor import org.thoughtcrime.securesms.pro.api.ProDetails -import org.thoughtcrime.securesms.pro.api.successOrThrow import org.thoughtcrime.securesms.pro.db.ProDatabase -import org.thoughtcrime.securesms.util.NetworkConnectivity -import java.time.Duration import java.time.Instant import javax.inject.Inject import javax.inject.Singleton -import kotlin.coroutines.cancellation.CancellationException - -typealias ForceRefresh = Boolean @Singleton class ProDetailsRepository @Inject constructor( + private val application: Application, private val db: ProDatabase, - private val apiExecutor: ProApiExecutor, - private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, - private val loginStateRepository: LoginStateRepository, - prefs: TextSecurePreferences, - networkConnectivity: NetworkConnectivity, + private val snodeClock: SnodeClock, @ManagerScope scope: CoroutineScope, ) { sealed interface LoadState { @@ -59,128 +44,52 @@ class ProDetailsRepository @Inject constructor( data class Error(override val lastUpdated: Pair?) : LoadState } - private val refreshRequests: SendChannel - - val loadState: StateFlow - - init { - val channel = Channel() - - refreshRequests = channel - @Suppress("OPT_IN_USAGE") - loadState = prefs.flowPostProLaunch { - loginStateRepository.loggedInState - .mapNotNull { it?.seeded?.proMasterPrivateKey } - }.distinctUntilChanged() - .flatMapLatest { proMasterKey -> - flow { - var last = db.getProDetailsAndLastUpdated() - var numRetried = 0 - var forceRefresh = false - - while (true) { - // Drain all pending requests as we are about to execute a request - while (true) { - val result = channel.tryReceive() - when { - result.isClosed -> { - Log.w(TAG, "Refresh channel closed, stopping Pro details fetcher") - return@flow - } - - result.isSuccess -> { - forceRefresh = forceRefresh || result.getOrThrow() - } - - else -> break - } - } - - var retryingAt: Instant? = null - - if (!forceRefresh && last != null - && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) >= Instant.now()) { - Log.d(TAG, "Pro details is fresh enough, skipping fetch") - // Last update was recent enough, skip fetching - emit(LoadState.Loaded(last)) - } else { - if (!networkConnectivity.networkAvailable.value) { - // No network...mark the state and wait for the network to be online - emit(LoadState.Loading(last, waitingForNetwork = true)) - - networkConnectivity.networkAvailable.first { it } - - // We might have waited a while for the network to come back - // so drain the refresh requests again to avoid blocking the requesters - // for too long - while (channel.tryReceive().isSuccess) {} - } - - emit(LoadState.Loading(last, waitingForNetwork = false)) - - // Fetch new details - try { - Log.d(TAG, "Start fetching Pro details from backend") - last = apiExecutor.executeRequest( - request = getProDetailsRequestFactory.create(proMasterKey) - ).successOrThrow() to Instant.now() - - db.updateProDetails(last.first, last.second) - - Log.d(TAG, "Successfully fetched Pro details from backend") - emit(LoadState.Loaded(last)) - numRetried = 0 - } catch (e: Exception) { - if (e is CancellationException) throw e - - emit(LoadState.Error(last)) - - // Exponential backoff for retries, capped at 2 minutes - val delaySeconds = minOf(10L * (1L shl numRetried), 120L) - Log.e(TAG, "Error fetching Pro details from backend, retrying in ${delaySeconds}s", e) - - retryingAt = Instant.now().plusSeconds(delaySeconds) - numRetried++ - } - - forceRefresh = false - } + val loadState: StateFlow = combine( + FetchProDetailsWorker.watch(application) + .map { it.state } + .distinctUntilChanged(), + + db.proDetailsChangeNotification + .onStart { emit(Unit) } + .map { db.getProDetailsAndLastUpdated() } + ) { state, last -> + when (state) { + WorkInfo.State.ENQUEUED, WorkInfo.State.BLOCKED -> LoadState.Loading(last, waitingForNetwork = true) + WorkInfo.State.RUNNING -> LoadState.Loading(last, waitingForNetwork = false) + WorkInfo.State.SUCCEEDED -> { + if (last != null) { + LoadState.Loaded(last) + } else { + // This should never happen, but just in case... + LoadState.Error(null) + } + } - // Wait until either a refresh is requested, or it's time to retry - select { - refreshRequests.onReceive { - Log.d(TAG, "Manual refresh requested: force = $it") - forceRefresh = it - } + WorkInfo.State.FAILED, WorkInfo.State.CANCELLED -> LoadState.Error(last) + } + }.stateIn(scope, SharingStarted.Eagerly, LoadState.Init) - if (retryingAt != null) { - val delayMillis = - Duration.between(Instant.now(), retryingAt).toMillis() - onTimeout(delayMillis) { - Log.d(TAG, "Retrying Pro details fetch after delay") - } - } - } - } - } - }.stateIn(scope, SharingStarted.Eagerly, LoadState.Init) - } /** * Requests a fresh of current user's pro details. By default, if last update is recent enough, * no network request will be made. If [force] is true, a network request will be * made regardless of the freshness of the last update. */ - suspend fun requestRefresh(force: Boolean = false) { - if ((loadState.value as? LoadState.Loading)?.waitingForNetwork == true) { - Log.d(TAG, "Currently waiting for network for a fetch, no need to send another request") + fun requestRefresh(force: Boolean = false) { + val currentState = loadState.value + if (!force && (currentState is LoadState.Loading || currentState is LoadState.Loaded) && + currentState.lastUpdated?.second?.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) + ?.isBefore(snodeClock.currentTime()) == true) { + Log.d(TAG, "Pro details are fresh enough, skipping refresh") return } - refreshRequests.send(force) + Log.d(TAG, "Scheduling fetch of Pro details from server") + FetchProDetailsWorker.schedule(application, ExistingWorkPolicy.KEEP) } + companion object { private const val TAG = "ProDetailsRepository" private const val MIN_UPDATE_INTERVAL_SECONDS = 120L diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt new file mode 100644 index 0000000000..5743121bec --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt @@ -0,0 +1,114 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingWorkPolicy +import androidx.work.NetworkType +import androidx.work.OneTimeWorkRequestBuilder +import androidx.work.WorkManager +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException +import org.session.libsession.snode.OnionRequestAPI +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.auth.LoginStateRepository +import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.ProDetails +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import org.thoughtcrime.securesms.util.getRootCause +import java.time.Duration +import java.time.Instant + +/** + * A worker that generates a new [network.loki.messenger.libsession_util.pro.ProProof] and stores it + * locally. + * + * Normally you don't need to interact with this worker directly, as it is scheduled + * automatically when needed based on the Pro details state, by the [FetchProDetailsWorker]. + */ +@HiltWorker +class ProProofGenerationWorker @AssistedInject constructor( + @Assisted context: Context, + @Assisted params: WorkerParameters, + private val proApiExecutor: ProApiExecutor, + private val proDatabase: ProDatabase, + private val generateProProofRequest: GenerateProProofRequest.Factory, + private val proDetailsRepository: ProDetailsRepository, + private val loginStateRepository: LoginStateRepository, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + val proMasterKey = requireNotNull(loginStateRepository.peekLoginState()?.seeded?.proMasterPrivateKey) { + "User must be logged to generate proof" + } + + val details = checkNotNull(proDetailsRepository.loadState.value.lastUpdated) { + "Pro details must be available to generate proof" + } + + check(details.first.status == ProDetails.DETAILS_STATUS_ACTIVE) { + "Pro status must be active to generate proof" + } + + return try { + val proof = proApiExecutor.executeRequest( + request = generateProProofRequest.create( + masterPrivateKey = proMasterKey, + rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(snodeClock.currentTime()).ed25519PrivKey + ) + ).successOrThrow() + + proDatabase.updateCurrentProProof(proof) + + Log.d(WORK_NAME, "Successfully generated a new pro proof expiring at ${Instant.ofEpochMilli(proof.expiryMs)}") + Result.success() + } catch (e: Exception) { + if (e is CancellationException) throw e + + Log.e(WORK_NAME, "Error generating Pro proof", e) + if (e is NonRetryableException || + // HTTP 403 indicates that the user is not + e.getRootCause()?.statusCode == 403) { + Result.failure() + } else { + Result.retry() + } + } + } + + companion object { + private const val WORK_NAME = "ProProofGenerationWorker" + + suspend fun schedule(context: Context, delay: Duration? = null) { + WorkManager.getInstance(context) + .enqueueUniqueWork(WORK_NAME, + ExistingWorkPolicy.REPLACE, + OneTimeWorkRequestBuilder() + .apply { + if (delay != null) { + setInitialDelay(delay) + } + } + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .build() + ) + .await() + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(WORK_NAME) + .await() + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt index cac1c1cda9..8c75d23255 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt @@ -1,5 +1,7 @@ package org.thoughtcrime.securesms.pro +import android.app.Application +import dagger.Lazy import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -12,12 +14,16 @@ import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapLatest import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch +import kotlinx.coroutines.time.delay import kotlinx.coroutines.withTimeout import network.loki.messenger.libsession_util.pro.BackendRequests import network.loki.messenger.libsession_util.pro.BackendRequests.PAYMENT_PROVIDER_APP_STORE @@ -25,8 +31,11 @@ import network.loki.messenger.libsession_util.pro.BackendRequests.PAYMENT_PROVID import network.loki.messenger.libsession_util.protocol.ProFeatures import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.snode.SnodeClock +import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.utilities.UserConfigType import org.session.libsession.utilities.recipients.Recipient +import org.session.libsession.utilities.userConfigsChanged import org.session.libsignal.utilities.Log import org.thoughtcrime.securesms.auth.LoginStateRepository import org.thoughtcrime.securesms.database.RecipientRepository @@ -38,21 +47,22 @@ import org.thoughtcrime.securesms.dependencies.ManagerScope import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent import org.thoughtcrime.securesms.pro.api.AddPaymentErrorStatus import org.thoughtcrime.securesms.pro.api.AddProPaymentRequest -import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest import org.thoughtcrime.securesms.pro.api.ProApiExecutor import org.thoughtcrime.securesms.pro.api.ProApiResponse -import org.thoughtcrime.securesms.pro.api.successOrThrow import org.thoughtcrime.securesms.pro.db.ProDatabase import org.thoughtcrime.securesms.pro.subscription.ProSubscriptionDuration import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager import org.thoughtcrime.securesms.util.State import java.time.Duration import java.time.Instant +import java.util.EnumSet import javax.inject.Inject import javax.inject.Singleton +@OptIn(ExperimentalCoroutinesApi::class) @Singleton class ProStatusManager @Inject constructor( + private val application: Application, private val prefs: TextSecurePreferences, recipientRepository: RecipientRepository, @param:ManagerScope private val scope: CoroutineScope, @@ -61,7 +71,7 @@ class ProStatusManager @Inject constructor( private val proDatabase: ProDatabase, private val snodeClock: SnodeClock, private val proDetailsRepository: ProDetailsRepository, - private val generateProProofRequest: GenerateProProofRequest.Factory, + private val configFactory: Lazy, ) : OnAppStartupComponent { val proDataState: StateFlow = combine( @@ -165,52 +175,72 @@ class ProStatusManager @Inject constructor( } } - // Manage pro proof based on - scope.launch { - combine( - loginState.loggedInState.mapNotNull { it?.seeded?.proMasterPrivateKey }, - proDetailsRepository.loadState - .filter { it != ProDetailsRepository.LoadState.Init } - .map { it.lastUpdated?.first?.expiry }, - ::Pair - ).collectLatest { (proMasterKey, proValidUntil) -> - val currentProof = proDatabase.getCurrentProProof() - - val now = snodeClock.currentTime() - if (proValidUntil != null && proValidUntil > now) { - if (currentProof != null) { - val proofExpiry = Instant.ofEpochMilli(currentProof.expiryMs) - - val renewProofAt = if (proofExpiry < now) { - if (Duration.between(now, proValidUntil).toMinutes() < 60) { - now - } else { - proValidUntil - Duration.ofMinutes(10 + (Math.random() * 50).toLong()) - } - } else { - now - } - - Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Scheduling pro proof renewal at $renewProofAt") - if (renewProofAt > now) { - delay(Duration.between(now, renewProofAt).toMillis()) - } + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus + .collectLatest { postLaunch -> + if (postLaunch) { + RevocationListPollingWorker.schedule(application) + } else { + RevocationListPollingWorker.cancel(application) } + } + } - Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Generating new pro proof") - try { - val proof = apiExecutor.executeRequest(request = generateProProofRequest.create( - masterPrivateKey = proMasterKey, - rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(now).ed25519PrivKey - )).successOrThrow() - } catch (e: Exception) { - TODO("Not yet implemented") + // ProDetails worker lifecycle + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus + .collectLatest { postLaunch -> + if (postLaunch) { + merge( + configFactory.get().userConfigsChanged(EnumSet.of(UserConfigType.USER_PROFILE)) + .map { "UserProfile changes" }, //TODO: also schedule it when the real config changes (not integrated yet) + + proDetailsRepository.loadState + .mapNotNull { it.lastUpdated?.first?.expiry } + .distinctUntilChanged() + .mapLatest { expiry -> + // Schedule a refresh 30seconds after access expiry + val refreshTime = expiry.plusSeconds(30) + + val now = snodeClock.currentTime() + if (now < refreshTime) { + val duration = Duration.between(now, refreshTime) + Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Delaying ProDetails refresh until $refreshTime due to access expiry") + delay(duration) + } + + "ProDetails expiry reached" + }, + + proDatabase.currentProProofChangesNotification + .onStart { emit(Unit) } + .mapNotNull { proDatabase.getCurrentProProof()?.expiryMs?.let(Instant::ofEpochMilli) } + .mapLatest { expiry -> + // Schedule a refresh for a random number between 10 and 60 minutes before proof expiry + val now = snodeClock.currentTime() + + val refreshTime = expiry.minus(Duration.ofMinutes((10..60).random().toLong())) + + if (now < refreshTime) { + Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Delaying ProDetails refresh until $refreshTime due to proof expiry") + delay(Duration.between(now, expiry)) + } + }, + + flowOf("App starting up") + ).collect { refreshReason -> + Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Scheduling ProDetails fetch due to: $refreshReason") + + proDetailsRepository.requestRefresh() + } + } else { + FetchProDetailsWorker.cancel(application) } } - } } } + /** * Logic to determine if we should animate the avatar for a user or freeze it on the first frame */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt index 5497967abe..7144182e8d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt @@ -230,6 +230,13 @@ class ProDatabase @Inject constructor( } } + private val mutableProDetailsChangeNotification = MutableSharedFlow( + extraBufferCapacity = 10, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + + val proDetailsChangeNotification: SharedFlow get() = mutableProDetailsChangeNotification + fun getProDetailsAndLastUpdated(): Pair? { return readableDatabase.rawQuery(""" SELECT name, value FROM pro_state @@ -255,12 +262,22 @@ class ProDatabase @Inject constructor( } fun updateProDetails(proDetails: ProDetails, updatedAt: Instant) { - //language=roomsql - writableDatabase.rawExecSQL(""" - INSERT OR REPLACE INTO pro_state (name, value) + val changes = writableDatabase.compileStatement(""" + INSERT INTO pro_state (name, value) VALUES (?, ?), (?, ?) - """, STATE_PRO_DETAILS, json.encodeToString(proDetails), - STATE_PRO_DETAILS_UPDATED_AT, updatedAt.toEpochMilli().toString()) + ON CONFLICT DO UPDATE SET value=excluded.value + WHERE value != excluded.value + """).use { stmt -> + stmt.bindString(1, STATE_PRO_DETAILS) + stmt.bindString(2, json.encodeToString(proDetails)) + stmt.bindString(3, STATE_PRO_DETAILS_UPDATED_AT) + stmt.bindString(4, updatedAt.toEpochMilli().toString()) + stmt.executeUpdateDelete() + } + + if (changes > 0) { + mutableProDetailsChangeNotification.tryEmit(Unit) + } } @Serializable From 6f0b119c36227e6d45b9f71b00436347fcab99fc Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 21 Nov 2025 12:46:42 +1100 Subject: [PATCH 3/3] Clean up --- .../thoughtcrime/securesms/pro/ProProofs.kt | 1 - .../securesms/pro/ProStatusManager.kt | 64 +++++++++++++++++-- .../pro/RevocationListPollingWorker.kt | 3 + 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt index fc9af41c44..1d13b218f4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofs.kt @@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.pro import com.google.protobuf.ByteString import network.loki.messenger.libsession_util.pro.ProProof import org.session.libsignal.protos.SignalServiceProtos -import java.time.Instant /** * Copies values from a libsession ProProof into a protobuf-based ProProof. diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt index 8c75d23255..1a84c4b42b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapLatest @@ -186,13 +187,18 @@ class ProStatusManager @Inject constructor( } } - // ProDetails worker lifecycle + manageProDetailsRefreshScheduling() + manageCurrentProProofRevocation() + } + + private fun manageProDetailsRefreshScheduling() { loginState.runWhileLoggedIn(scope) { postProLaunchStatus .collectLatest { postLaunch -> if (postLaunch) { merge( - configFactory.get().userConfigsChanged(EnumSet.of(UserConfigType.USER_PROFILE)) + configFactory.get() + .userConfigsChanged(EnumSet.of(UserConfigType.USER_PROFILE)) .map { "UserProfile changes" }, //TODO: also schedule it when the real config changes (not integrated yet) proDetailsRepository.loadState @@ -205,7 +211,10 @@ class ProStatusManager @Inject constructor( val now = snodeClock.currentTime() if (now < refreshTime) { val duration = Duration.between(now, refreshTime) - Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Delaying ProDetails refresh until $refreshTime due to access expiry") + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Delaying ProDetails refresh until $refreshTime due to access expiry" + ) delay(duration) } @@ -214,22 +223,33 @@ class ProStatusManager @Inject constructor( proDatabase.currentProProofChangesNotification .onStart { emit(Unit) } - .mapNotNull { proDatabase.getCurrentProProof()?.expiryMs?.let(Instant::ofEpochMilli) } + .mapNotNull { + proDatabase.getCurrentProProof()?.expiryMs?.let( + Instant::ofEpochMilli + ) + } .mapLatest { expiry -> // Schedule a refresh for a random number between 10 and 60 minutes before proof expiry val now = snodeClock.currentTime() - val refreshTime = expiry.minus(Duration.ofMinutes((10..60).random().toLong())) + val refreshTime = + expiry.minus(Duration.ofMinutes((10..60).random().toLong())) if (now < refreshTime) { - Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Delaying ProDetails refresh until $refreshTime due to proof expiry") + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Delaying ProDetails refresh until $refreshTime due to proof expiry" + ) delay(Duration.between(now, expiry)) } }, flowOf("App starting up") ).collect { refreshReason -> - Log.d(DebugLogGroup.PRO_SUBSCRIPTION.label, "Scheduling ProDetails fetch due to: $refreshReason") + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Scheduling ProDetails fetch due to: $refreshReason" + ) proDetailsRepository.requestRefresh() } @@ -240,6 +260,36 @@ class ProStatusManager @Inject constructor( } } + private fun manageCurrentProProofRevocation() { + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus.collectLatest { postLaunch -> + if (postLaunch) { + combine( + proDatabase.currentProProofChangesNotification + .onStart { emit(Unit) } + .mapNotNull { proDatabase.getCurrentProProof()?.genIndexHashHex }, + + proDatabase.revocationChangeNotification + .onStart { emit(Unit) }, + + { proofGenIndexHash, _ -> + proofGenIndexHash.takeIf { proDatabase.isRevoked(it) } + } + ) + .filterNotNull() + .collectLatest { revokedHash -> + if (revokedHash == proDatabase.getCurrentProProof()?.genIndexHashHex) { + Log.w( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Current Pro proof has been revoked, clearing local proof" + ) + proDatabase.updateCurrentProProof(null) + } + } + } + } + } + } /** * Logic to determine if we should animate the avatar for a user or freeze it on the first frame diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt index 997d061fa5..6c4b238fd6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt @@ -24,6 +24,9 @@ import java.time.Duration import java.util.concurrent.TimeUnit import kotlin.coroutines.cancellation.CancellationException +/** + * A long running worker which periodically polls the revocation list and updates the local database. + */ @HiltWorker class RevocationListPollingWorker @AssistedInject constructor( @Assisted context: Context,