diff --git a/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt index e69c8d1790..f8279dced4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/auth/LoginStateRepository.kt @@ -160,14 +160,16 @@ class LoginStateRepository @Inject constructor( * Runs the provided [block] suspend function while the user is logged in, and cancels it * when logged out. */ - suspend fun runWhileLoggedIn(block: suspend () -> Unit) { - loggedInState - .map { it != null } - .collectLatest { loggedIn -> - if (loggedIn) { - block() + fun runWhileLoggedIn(scope: CoroutineScope, block: suspend () -> Unit) { + scope.launch { + loggedInState + .map { it != null } + .collectLatest { loggedIn -> + if (loggedIn) { + block() + } } - } + } } fun clear() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt b/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt index 8197d585fd..5b4d3554ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/debugmenu/DebugLogger.kt @@ -129,6 +129,6 @@ data class DebugLogData( enum class DebugLogGroup(val label: String, val color: Color){ AVATAR("Avatar", primaryOrange), - PRO_SUBSCRIPTION("Pro Subscription", primaryGreen), - PRO_DATA("Pro Data", primaryBlue) + PRO_SUBSCRIPTION("ProSubscription", primaryGreen), + PRO_DATA("ProData", primaryBlue) } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt index c0b136e603..e1730e784e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt @@ -20,7 +20,6 @@ import org.thoughtcrime.securesms.logging.PersistentLogger import org.thoughtcrime.securesms.migration.DatabaseMigrationManager import org.thoughtcrime.securesms.notifications.BackgroundPollManager import org.thoughtcrime.securesms.notifications.PushRegistrationHandler -import org.thoughtcrime.securesms.pro.ProStatePoller import org.thoughtcrime.securesms.pro.ProStatusManager import org.thoughtcrime.securesms.pro.subscription.SubscriptionCoordinator import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager @@ -71,7 +70,6 @@ class OnAppStartupComponents private constructor( avatarUploadManager: AvatarUploadManager, configToDatabaseSync: ConfigToDatabaseSync, subscriptionManagers: Set<@JvmSuppressWildcards SubscriptionManager>, - proStatePoller: ProStatePoller, ): this( components = listOf( configUploader, @@ -103,7 +101,6 @@ class OnAppStartupComponents private constructor( subscriptionCoordinator, avatarUploadManager, configToDatabaseSync, - proStatePoller, ) + subscriptionManagers ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt index ab9cce4dfc..346526d59b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/preferences/SettingsViewModel.kt @@ -156,7 +156,9 @@ class SettingsViewModel @Inject constructor( } } - proDetailsRepository.requestRefresh() + viewModelScope.launch { + proDetailsRepository.requestRefresh() + } } private fun getVersionNumber(): CharSequence { diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt new file mode 100644 index 0000000000..e8ccf0ae98 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/FetchProDetailsWorker.kt @@ -0,0 +1,158 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingWorkPolicy +import androidx.work.NetworkType +import androidx.work.OneTimeWorkRequestBuilder +import androidx.work.WorkInfo +import androidx.work.WorkManager +import androidx.work.WorkQuery +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.mapNotNull +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.auth.LoginStateRepository +import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.ProDetails +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import java.time.Duration + +/** + * A worker that fetches the user's Pro details from the server and updates the local database. + * + * This worker doesn't do any business logic in terms of when to schedule itself, it simply performs + * the fetch and update operation regardlessly. It, however, does schedule the [ProProofGenerationWorker] + * if needed based on the fetched Pro details, this is because the proof generation logic + * is tightly coupled to the fetched Pro details state. + */ +@HiltWorker +class FetchProDetailsWorker @AssistedInject constructor( + @Assisted private val context: Context, + @Assisted params: WorkerParameters, + private val apiExecutor: ProApiExecutor, + private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, + private val proDatabase: ProDatabase, + private val loginStateRepository: LoginStateRepository, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + val proMasterKey = + requireNotNull(loginStateRepository.peekLoginState()?.seeded?.proMasterPrivateKey) { + "User must be logged in to fetch pro details" + } + + return try { + Log.d(TAG, "Fetching Pro details from server") + val details = apiExecutor.executeRequest( + request = getProDetailsRequestFactory.create(proMasterKey) + ).successOrThrow() + + Log.d( + TAG, + "Fetched pro details, status = ${details.status}, expiry = ${details.expiry}" + ) + + proDatabase.updateProDetails(proDetails = details, updatedAt = snodeClock.currentTime()) + + scheduleProofGenerationIfNeeded(details) + + Result.success() + } catch (e: CancellationException) { + Log.d(TAG, "Work cancelled") + throw e + } catch (e: NonRetryableException) { + Log.e(TAG, "Non-retryable error fetching pro details", e) + Result.failure() + } catch (e: Exception) { + Log.e(TAG, "Error fetching pro details", e) + Result.retry() + } + } + + + private suspend fun scheduleProofGenerationIfNeeded(details: ProDetails) { + val now = snodeClock.currentTimeMills() + + if (details.status != ProDetails.DETAILS_STATUS_ACTIVE) { + Log.d(TAG, "Pro is not active, clearing proof") + ProProofGenerationWorker.cancel(context) + proDatabase.updateCurrentProProof(null) + } else { + val currentProof = proDatabase.getCurrentProProof() + + if (currentProof == null || currentProof.expiryMs <= now) { + Log.d( + TAG, + "Pro is active but no valid proof found, scheduling proof generation now" + ) + ProProofGenerationWorker.schedule(context) + } else if (currentProof.expiryMs - now <= Duration.ofMinutes(60).toMillis() && + details.expiry!!.toEpochMilli() - now > Duration.ofMinutes(60).toMillis() && + details.autoRenewing == true + ) { + val delay = Duration.ofMinutes((Math.random() * 50 + 10).toLong()) + Log.d(TAG, "Pro proof is expiring soon, scheduling proof generation in $delay") + ProProofGenerationWorker.schedule(context, delay) + } else { + Log.d( + TAG, + "Pro proof is still valid for a long period, no need to schedule proof generation" + ) + } + } + } + + companion object { + private const val TAG = "FetchProDetailsWorker" + + fun watch(context: Context): Flow { + val workQuery = WorkQuery.Builder + .fromUniqueWorkNames(listOf(TAG)) + .build() + + return WorkManager.getInstance(context) + .getWorkInfosFlow(workQuery) + .mapNotNull { it.firstOrNull() } + } + + fun schedule( + context: Context, + existingWorkPolicy: ExistingWorkPolicy, + delay: Duration? = null + ) { + WorkManager.getInstance(context) + .enqueueUniqueWork( + uniqueWorkName = TAG, + existingWorkPolicy = existingWorkPolicy, + request = OneTimeWorkRequestBuilder() + .apply { + if (delay != null) { + setInitialDelay(delay) + } + } + .addTag(TAG) + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .build() + ) + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(TAG) + .await() + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt deleted file mode 100644 index 2bc23da363..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/Flows.kt +++ /dev/null @@ -1,21 +0,0 @@ -package org.thoughtcrime.securesms.pro - -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.onStart -import org.session.libsession.utilities.TextSecurePreferences -import org.thoughtcrime.securesms.util.castAwayType - -/** - * Creates a flow that only emits when the debug flag forcePostPro is enabled. - */ -fun TextSecurePreferences.flowPostProLaunch(flowFactory: () -> Flow): Flow { - @Suppress("OPT_IN_USAGE") - return TextSecurePreferences.events - .filter { it == TextSecurePreferences.SET_FORCE_POST_PRO } - .castAwayType() - .onStart { emit(Unit) } - .filter { forcePostPro() } - .flatMapLatest { flowFactory() } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt index a901b75a62..9b019ed715 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProDetailsRepository.kt @@ -1,43 +1,30 @@ package org.thoughtcrime.securesms.pro +import android.app.Application +import androidx.work.ExistingWorkPolicy +import androidx.work.WorkInfo import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.selects.select -import kotlinx.coroutines.selects.onTimeout -import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.snode.SnodeClock import org.session.libsignal.utilities.Log -import org.thoughtcrime.securesms.auth.LoginStateRepository import org.thoughtcrime.securesms.dependencies.ManagerScope -import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest -import org.thoughtcrime.securesms.pro.api.ProApiExecutor import org.thoughtcrime.securesms.pro.api.ProDetails -import org.thoughtcrime.securesms.pro.api.successOrThrow import org.thoughtcrime.securesms.pro.db.ProDatabase -import org.thoughtcrime.securesms.util.NetworkConnectivity -import java.time.Duration import java.time.Instant import javax.inject.Inject import javax.inject.Singleton -import kotlin.coroutines.cancellation.CancellationException @Singleton class ProDetailsRepository @Inject constructor( + private val application: Application, private val db: ProDatabase, - private val apiExecutor: ProApiExecutor, - private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, - private val loginStateRepository: LoginStateRepository, - private val prefs: TextSecurePreferences, - networkConnectivity: NetworkConnectivity, + private val snodeClock: SnodeClock, @ManagerScope scope: CoroutineScope, ) { sealed interface LoadState { @@ -57,92 +44,51 @@ class ProDetailsRepository @Inject constructor( data class Error(override val lastUpdated: Pair?) : LoadState } - private val refreshRequests: SendChannel - - val loadState: StateFlow - - init { - val channel = Channel(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST) - - refreshRequests = channel - @Suppress("OPT_IN_USAGE") - loadState = prefs.flowPostProLaunch { - loginStateRepository.loggedInState - .mapNotNull { it?.seeded?.proMasterPrivateKey } - }.distinctUntilChanged() - .flatMapLatest { proMasterKey -> - flow { - var last = db.getProDetailsAndLastUpdated() - var numRetried = 0 - - while (true) { - // Drain all pending requests as we are about to execute a request - while (channel.tryReceive().isSuccess) { } - - var retryingAt: Instant? = null - - if (last != null && last.second.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) >= Instant.now()) { - Log.d(TAG, "Pro details is fresh enough, skipping fetch") - // Last update was recent enough, skip fetching - emit(LoadState.Loaded(last)) - } else { - if (!networkConnectivity.networkAvailable.value) { - // No network...mark the state and wait for it to come back - emit(LoadState.Loading(last, waitingForNetwork = true)) - networkConnectivity.networkAvailable.first { it } - } - - emit(LoadState.Loading(last, waitingForNetwork = false)) - - // Fetch new details - try { - Log.d(TAG, "Start fetching Pro details from backend") - last = apiExecutor.executeRequest( - request = getProDetailsRequestFactory.create(proMasterKey) - ).successOrThrow() to Instant.now() - - db.updateProDetails(last.first, last.second) - - Log.d(TAG, "Successfully fetched Pro details from backend") - emit(LoadState.Loaded(last)) - numRetried = 0 - } catch (e: Exception) { - if (e is CancellationException) throw e - - emit(LoadState.Error(last)) - - // Exponential backoff for retries, capped at 2 minutes - val delaySeconds = minOf(10L * (1L shl numRetried), 120L) - Log.e(TAG, "Error fetching Pro details from backend, retrying in ${delaySeconds}s", e) - - retryingAt = Instant.now().plusSeconds(delaySeconds) - numRetried++ - } - } + val loadState: StateFlow = combine( + FetchProDetailsWorker.watch(application) + .map { it.state } + .distinctUntilChanged(), + + db.proDetailsChangeNotification + .onStart { emit(Unit) } + .map { db.getProDetailsAndLastUpdated() } + ) { state, last -> + when (state) { + WorkInfo.State.ENQUEUED, WorkInfo.State.BLOCKED -> LoadState.Loading(last, waitingForNetwork = true) + WorkInfo.State.RUNNING -> LoadState.Loading(last, waitingForNetwork = false) + WorkInfo.State.SUCCEEDED -> { + if (last != null) { + LoadState.Loaded(last) + } else { + // This should never happen, but just in case... + LoadState.Error(null) + } + } - // Wait until either a refresh is requested, or it's time to retry - select { - refreshRequests.onReceiveCatching { - Log.d(TAG, "Manual refresh requested") - } + WorkInfo.State.FAILED, WorkInfo.State.CANCELLED -> LoadState.Error(last) + } + }.stateIn(scope, SharingStarted.Eagerly, LoadState.Init) + + + /** + * Requests a fresh of current user's pro details. By default, if last update is recent enough, + * no network request will be made. If [force] is true, a network request will be + * made regardless of the freshness of the last update. + */ + fun requestRefresh(force: Boolean = false) { + val currentState = loadState.value + if (!force && (currentState is LoadState.Loading || currentState is LoadState.Loaded) && + currentState.lastUpdated?.second?.plusSeconds(MIN_UPDATE_INTERVAL_SECONDS) + ?.isBefore(snodeClock.currentTime()) == true) { + Log.d(TAG, "Pro details are fresh enough, skipping refresh") + return + } - if (retryingAt != null) { - val delayMillis = - Duration.between(Instant.now(), retryingAt).toMillis() - onTimeout(delayMillis) { - Log.d(TAG, "Retrying Pro details fetch after delay") - } - } - } - } - } - }.stateIn(scope, SharingStarted.Eagerly, LoadState.Init) + Log.d(TAG, "Scheduling fetch of Pro details from server") + FetchProDetailsWorker.schedule(application, ExistingWorkPolicy.KEEP) } - fun requestRefresh() { - refreshRequests.trySend(Unit) - } companion object { private const val TAG = "ProDetailsRepository" diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt new file mode 100644 index 0000000000..5743121bec --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProProofGenerationWorker.kt @@ -0,0 +1,114 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingWorkPolicy +import androidx.work.NetworkType +import androidx.work.OneTimeWorkRequestBuilder +import androidx.work.WorkManager +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import kotlinx.coroutines.CancellationException +import org.session.libsession.snode.OnionRequestAPI +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.auth.LoginStateRepository +import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.ProDetails +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import org.thoughtcrime.securesms.util.getRootCause +import java.time.Duration +import java.time.Instant + +/** + * A worker that generates a new [network.loki.messenger.libsession_util.pro.ProProof] and stores it + * locally. + * + * Normally you don't need to interact with this worker directly, as it is scheduled + * automatically when needed based on the Pro details state, by the [FetchProDetailsWorker]. + */ +@HiltWorker +class ProProofGenerationWorker @AssistedInject constructor( + @Assisted context: Context, + @Assisted params: WorkerParameters, + private val proApiExecutor: ProApiExecutor, + private val proDatabase: ProDatabase, + private val generateProProofRequest: GenerateProProofRequest.Factory, + private val proDetailsRepository: ProDetailsRepository, + private val loginStateRepository: LoginStateRepository, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + val proMasterKey = requireNotNull(loginStateRepository.peekLoginState()?.seeded?.proMasterPrivateKey) { + "User must be logged to generate proof" + } + + val details = checkNotNull(proDetailsRepository.loadState.value.lastUpdated) { + "Pro details must be available to generate proof" + } + + check(details.first.status == ProDetails.DETAILS_STATUS_ACTIVE) { + "Pro status must be active to generate proof" + } + + return try { + val proof = proApiExecutor.executeRequest( + request = generateProProofRequest.create( + masterPrivateKey = proMasterKey, + rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(snodeClock.currentTime()).ed25519PrivKey + ) + ).successOrThrow() + + proDatabase.updateCurrentProProof(proof) + + Log.d(WORK_NAME, "Successfully generated a new pro proof expiring at ${Instant.ofEpochMilli(proof.expiryMs)}") + Result.success() + } catch (e: Exception) { + if (e is CancellationException) throw e + + Log.e(WORK_NAME, "Error generating Pro proof", e) + if (e is NonRetryableException || + // HTTP 403 indicates that the user is not + e.getRootCause()?.statusCode == 403) { + Result.failure() + } else { + Result.retry() + } + } + } + + companion object { + private const val WORK_NAME = "ProProofGenerationWorker" + + suspend fun schedule(context: Context, delay: Duration? = null) { + WorkManager.getInstance(context) + .enqueueUniqueWork(WORK_NAME, + ExistingWorkPolicy.REPLACE, + OneTimeWorkRequestBuilder() + .apply { + if (delay != null) { + setInitialDelay(delay) + } + } + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .build() + ) + .await() + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(WORK_NAME) + .await() + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt deleted file mode 100644 index 7fcee40396..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatePoller.kt +++ /dev/null @@ -1,228 +0,0 @@ -package org.thoughtcrime.securesms.pro - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.selects.onTimeout -import kotlinx.coroutines.selects.select -import org.session.libsession.snode.SnodeClock -import org.session.libsession.utilities.TextSecurePreferences -import org.session.libsignal.utilities.Log -import org.thoughtcrime.securesms.auth.LoginStateRepository -import org.thoughtcrime.securesms.dependencies.ManagerScope -import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent -import org.thoughtcrime.securesms.pro.api.GenerateProProofRequest -import org.thoughtcrime.securesms.pro.api.GetProDetailsRequest -import org.thoughtcrime.securesms.pro.api.GetProRevocationRequest -import org.thoughtcrime.securesms.pro.api.ProApiExecutor -import org.thoughtcrime.securesms.pro.api.ProDetails -import org.thoughtcrime.securesms.pro.api.successOrThrow -import org.thoughtcrime.securesms.pro.db.ProDatabase -import org.thoughtcrime.securesms.util.NetworkConnectivity -import org.thoughtcrime.securesms.util.castAwayType -import java.time.Duration -import java.time.Instant -import javax.inject.Inject -import javax.inject.Singleton - -typealias PollToken = Channel> - -@Singleton -class ProStatePoller @Inject constructor( - loginStateRepository: LoginStateRepository, - private val connectivity: NetworkConnectivity, - private val getProDetailsRequestFactory: GetProDetailsRequest.Factory, - private val generateProProofRequest: GenerateProProofRequest.Factory, - private val getProRevocationRequestFactory: GetProRevocationRequest.Factory, - private val proDatabase: ProDatabase, - private val snodeClock: SnodeClock, - private val apiExecutor: ProApiExecutor, - private val proDetailsRepository: ProDetailsRepository, - prefs: TextSecurePreferences, - @ManagerScope scope: CoroutineScope, -): OnAppStartupComponent { - private val manualPollRequest = Channel() - - enum class PollState { - Init, - Polling, - UpToDate, - } - - @OptIn(ExperimentalCoroutinesApi::class) - val pollState: StateFlow = prefs.flowPostProLaunch { - loginStateRepository - .loggedInState - .map { it?.seeded?.proMasterPrivateKey } - } - .distinctUntilChanged() - .flatMapLatest { proMasterPrivateKey -> - if (proMasterPrivateKey == null) { - return@flatMapLatest emptyFlow() - } - - flow { - var numRetried = 0 - var nextPoll: Instant? = null - val pollTokens = mutableListOf() - - while (true) { - // Wait for network to become available - connectivity.networkAvailable.first { it } - - if (nextPoll != null) { - val now = Instant.now() - if (now < nextPoll) { - val delayMillis = Duration.between(now, nextPoll).toMillis() - Log.d(TAG, "Delaying next poll for $delayMillis ms") - select { - onTimeout(delayMillis) {} - manualPollRequest.onReceiveCatching { - if (it.isSuccess) { - pollTokens.add(it.getOrThrow()) - } - Log.d(TAG, "Manual poll requested") - } - } - } - } - - // Drain any manual poll requests - while (true) { - val received = manualPollRequest.tryReceive() - if (received.isSuccess) { - pollTokens += received.getOrThrow() - } else { - break - } - } - - emit(PollState.Polling) - Log.d(TAG, "Start polling Pro state") - - val result = runCatching { - pollOnce(proMasterPrivateKey) - emit(PollState.UpToDate) - Log.d(TAG, "Pro state polled successful") - } - - pollTokens.forEach { it.trySend(result) } - - nextPoll = when { - result.isSuccess -> { - numRetried = 0 - Instant.now().plusSeconds(POLL_INTERVAL_MINUTES * 60) - } - - result.exceptionOrNull() is CancellationException -> { - throw result.exceptionOrNull()!! - } - - else -> { - numRetried++ - val delaySeconds = (POLL_RETRY_INTERVAL_MIN_SECONDS * numRetried * 1.2).toLong() - .coerceAtMost(POLL_INTERVAL_MINUTES * 60) - - Log.e(TAG, "Error polling pro state, retrying in $delaySeconds seconds", result.exceptionOrNull()) - - Instant.now().plusSeconds(delaySeconds) - } - } - } - } - } - .stateIn(scope, started = SharingStarted.Eagerly, initialValue = PollState.Init) - - suspend fun requestPollOnceAndWait() { - val channel = Channel>() - manualPollRequest.send(channel) - channel.receive().getOrThrow() - } - - private suspend fun pollOnce(proMasterPrivateKey: ByteArray) { - val currentProof = proDatabase.getCurrentProProof() - - if (currentProof == null || currentProof.expiryMs <= snodeClock.currentTimeMills()) { - // Current proof is missing or expired, grab the pro details to decide what to do next - proDetailsRepository.requestRefresh() - - val details = proDetailsRepository.loadState.mapNotNull { it.lastUpdated }.first().first - - val newProof = if (details.status == ProDetails.DETAILS_STATUS_ACTIVE) { - Log.d(TAG, "User is active Pro but has no valid proof, generating new proof") - apiExecutor.executeRequest( - request = generateProProofRequest.create( - masterPrivateKey = proMasterPrivateKey, - rotatingPrivateKey = proDatabase.ensureValidRotatingKeys(snodeClock.currentTime()).ed25519PrivKey - ) - ).successOrThrow() - } else { - Log.d(TAG, "User is not active pro") - null - } - - Log.d(TAG, "Updating current pro proof to $newProof") - proDatabase.updateCurrentProProof(newProof) - } else { - // Current proof is still valid, we just need to check for revocation - val lastTicket = proDatabase.getLastRevocationTicket() - - val revocations = apiExecutor.executeRequest( - request = getProRevocationRequestFactory.create(lastTicket) - ).successOrThrow() - - proDatabase.updateRevocations( - newTicket = revocations.ticket, - data = revocations.items - ) - - if (proDatabase.isRevoked(currentProof.genIndexHashHex)) { - Log.d(TAG, "Current pro proof has been revoked, deleting") - proDatabase.updateCurrentProProof(null) - } else { - Log.d(TAG, "Current pro proof is still valid and not revoked") - } - } - } - - companion object { - private const val TAG = "ProStatePoller" - - - private const val POLL_INTERVAL_MINUTES = 1L - private const val POLL_RETRY_INTERVAL_MIN_SECONDS = 5L - -// fun schedule(context: Context) { -// WorkManager.getInstance(context) -// .enqueueUniquePeriodicWork( -// WORK_NAME, -// ExistingPeriodicWorkPolicy.KEEP, -// androidx.work.PeriodicWorkRequestBuilder( -// repeatInterval = Duration.ofMinutes(15) -// ).setConstraints( -// Constraints(requiredNetworkType = NetworkType.CONNECTED) -// ).setInitialDelay(5, TimeUnit.SECONDS) -// .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 10, TimeUnit.SECONDS) -// .build() -// ) -// } -// -// fun cancel(context: Context) { -// WorkManager.getInstance(context) -// .cancelUniqueWork(WORK_NAME) -// } - } -} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt index 8596c696af..1a84c4b42b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/ProStatusManager.kt @@ -1,5 +1,7 @@ package org.thoughtcrime.securesms.pro +import android.app.Application +import dagger.Lazy import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -8,13 +10,21 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch +import kotlinx.coroutines.time.delay import kotlinx.coroutines.withTimeout import network.loki.messenger.libsession_util.pro.BackendRequests import network.loki.messenger.libsession_util.pro.BackendRequests.PAYMENT_PROVIDER_APP_STORE @@ -22,8 +32,11 @@ import network.loki.messenger.libsession_util.pro.BackendRequests.PAYMENT_PROVID import network.loki.messenger.libsession_util.protocol.ProFeatures import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.snode.SnodeClock +import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.utilities.UserConfigType import org.session.libsession.utilities.recipients.Recipient +import org.session.libsession.utilities.userConfigsChanged import org.session.libsignal.utilities.Log import org.thoughtcrime.securesms.auth.LoginStateRepository import org.thoughtcrime.securesms.database.RecipientRepository @@ -43,11 +56,14 @@ import org.thoughtcrime.securesms.pro.subscription.SubscriptionManager import org.thoughtcrime.securesms.util.State import java.time.Duration import java.time.Instant +import java.util.EnumSet import javax.inject.Inject import javax.inject.Singleton +@OptIn(ExperimentalCoroutinesApi::class) @Singleton class ProStatusManager @Inject constructor( + private val application: Application, private val prefs: TextSecurePreferences, recipientRepository: RecipientRepository, @param:ManagerScope private val scope: CoroutineScope, @@ -55,6 +71,8 @@ class ProStatusManager @Inject constructor( private val loginState: LoginStateRepository, private val proDatabase: ProDatabase, private val snodeClock: SnodeClock, + private val proDetailsRepository: ProDetailsRepository, + private val configFactory: Lazy, ) : OnAppStartupComponent { val proDataState: StateFlow = combine( @@ -157,6 +175,120 @@ class ProStatusManager @Inject constructor( _postProLaunchStatus.update { isPostPro() } } } + + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus + .collectLatest { postLaunch -> + if (postLaunch) { + RevocationListPollingWorker.schedule(application) + } else { + RevocationListPollingWorker.cancel(application) + } + } + } + + manageProDetailsRefreshScheduling() + manageCurrentProProofRevocation() + } + + private fun manageProDetailsRefreshScheduling() { + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus + .collectLatest { postLaunch -> + if (postLaunch) { + merge( + configFactory.get() + .userConfigsChanged(EnumSet.of(UserConfigType.USER_PROFILE)) + .map { "UserProfile changes" }, //TODO: also schedule it when the real config changes (not integrated yet) + + proDetailsRepository.loadState + .mapNotNull { it.lastUpdated?.first?.expiry } + .distinctUntilChanged() + .mapLatest { expiry -> + // Schedule a refresh 30seconds after access expiry + val refreshTime = expiry.plusSeconds(30) + + val now = snodeClock.currentTime() + if (now < refreshTime) { + val duration = Duration.between(now, refreshTime) + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Delaying ProDetails refresh until $refreshTime due to access expiry" + ) + delay(duration) + } + + "ProDetails expiry reached" + }, + + proDatabase.currentProProofChangesNotification + .onStart { emit(Unit) } + .mapNotNull { + proDatabase.getCurrentProProof()?.expiryMs?.let( + Instant::ofEpochMilli + ) + } + .mapLatest { expiry -> + // Schedule a refresh for a random number between 10 and 60 minutes before proof expiry + val now = snodeClock.currentTime() + + val refreshTime = + expiry.minus(Duration.ofMinutes((10..60).random().toLong())) + + if (now < refreshTime) { + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Delaying ProDetails refresh until $refreshTime due to proof expiry" + ) + delay(Duration.between(now, expiry)) + } + }, + + flowOf("App starting up") + ).collect { refreshReason -> + Log.d( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Scheduling ProDetails fetch due to: $refreshReason" + ) + + proDetailsRepository.requestRefresh() + } + } else { + FetchProDetailsWorker.cancel(application) + } + } + } + } + + private fun manageCurrentProProofRevocation() { + loginState.runWhileLoggedIn(scope) { + postProLaunchStatus.collectLatest { postLaunch -> + if (postLaunch) { + combine( + proDatabase.currentProProofChangesNotification + .onStart { emit(Unit) } + .mapNotNull { proDatabase.getCurrentProProof()?.genIndexHashHex }, + + proDatabase.revocationChangeNotification + .onStart { emit(Unit) }, + + { proofGenIndexHash, _ -> + proofGenIndexHash.takeIf { proDatabase.isRevoked(it) } + } + ) + .filterNotNull() + .collectLatest { revokedHash -> + if (revokedHash == proDatabase.getCurrentProProof()?.genIndexHashHex) { + Log.w( + DebugLogGroup.PRO_SUBSCRIPTION.label, + "Current Pro proof has been revoked, clearing local proof" + ) + proDatabase.updateCurrentProProof(null) + } + } + } + } + } } /** diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt new file mode 100644 index 0000000000..6c4b238fd6 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/RevocationListPollingWorker.kt @@ -0,0 +1,88 @@ +package org.thoughtcrime.securesms.pro + +import android.content.Context +import androidx.hilt.work.HiltWorker +import androidx.work.BackoffPolicy +import androidx.work.Constraints +import androidx.work.CoroutineWorker +import androidx.work.ExistingPeriodicWorkPolicy +import androidx.work.NetworkType +import androidx.work.PeriodicWorkRequestBuilder +import androidx.work.WorkManager +import androidx.work.WorkerParameters +import androidx.work.await +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import org.session.libsession.snode.SnodeClock +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.pro.api.GetProRevocationRequest +import org.thoughtcrime.securesms.pro.api.ProApiExecutor +import org.thoughtcrime.securesms.pro.api.successOrThrow +import org.thoughtcrime.securesms.pro.db.ProDatabase +import java.time.Duration +import java.util.concurrent.TimeUnit +import kotlin.coroutines.cancellation.CancellationException + +/** + * A long running worker which periodically polls the revocation list and updates the local database. + */ +@HiltWorker +class RevocationListPollingWorker @AssistedInject constructor( + @Assisted context: Context, + @Assisted params: WorkerParameters, + private val proDatabase: ProDatabase, + private val getProRevocationRequestFactory: GetProRevocationRequest.Factory, + private val proApiExecutor: ProApiExecutor, + private val snodeClock: SnodeClock, +) : CoroutineWorker(context, params) { + override suspend fun doWork(): Result { + try { + val lastTicket = proDatabase.getLastRevocationTicket() + val response = proApiExecutor.executeRequest(request = getProRevocationRequestFactory.create(lastTicket)).successOrThrow() + proDatabase.updateRevocations( + data = response.items, + newTicket = response.ticket + ) + + proDatabase.pruneRevocations(snodeClock.currentTime()) + + return Result.success() + } catch (e: Exception) { + if (e is CancellationException) { + throw e + } + + Log.e(TAG, "Error polling revocation list", e) + return if (e is NonRetryableException) { + Result.failure() + } else { + Result.retry() + } + } + } + + companion object { + private const val TAG = "RevocationListPollingWorker" + + private const val WORK_NAME = "RevocationListPollingWorker" + + suspend fun schedule(context: Context) { + WorkManager.getInstance(context) + .enqueueUniquePeriodicWork(WORK_NAME, ExistingPeriodicWorkPolicy.KEEP, + PeriodicWorkRequestBuilder(Duration.ofMinutes(15)) + .setInitialDelay(0L, TimeUnit.MILLISECONDS) + .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10)) + .setConstraints(Constraints(requiredNetworkType = NetworkType.CONNECTED)) + .build() + ) + .await() + } + + suspend fun cancel(context: Context) { + WorkManager.getInstance(context) + .cancelUniqueWork(WORK_NAME) + .await() + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt index 5497967abe..7144182e8d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/pro/db/ProDatabase.kt @@ -230,6 +230,13 @@ class ProDatabase @Inject constructor( } } + private val mutableProDetailsChangeNotification = MutableSharedFlow( + extraBufferCapacity = 10, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + + val proDetailsChangeNotification: SharedFlow get() = mutableProDetailsChangeNotification + fun getProDetailsAndLastUpdated(): Pair? { return readableDatabase.rawQuery(""" SELECT name, value FROM pro_state @@ -255,12 +262,22 @@ class ProDatabase @Inject constructor( } fun updateProDetails(proDetails: ProDetails, updatedAt: Instant) { - //language=roomsql - writableDatabase.rawExecSQL(""" - INSERT OR REPLACE INTO pro_state (name, value) + val changes = writableDatabase.compileStatement(""" + INSERT INTO pro_state (name, value) VALUES (?, ?), (?, ?) - """, STATE_PRO_DETAILS, json.encodeToString(proDetails), - STATE_PRO_DETAILS_UPDATED_AT, updatedAt.toEpochMilli().toString()) + ON CONFLICT DO UPDATE SET value=excluded.value + WHERE value != excluded.value + """).use { stmt -> + stmt.bindString(1, STATE_PRO_DETAILS) + stmt.bindString(2, json.encodeToString(proDetails)) + stmt.bindString(3, STATE_PRO_DETAILS_UPDATED_AT) + stmt.bindString(4, updatedAt.toEpochMilli().toString()) + stmt.executeUpdateDelete() + } + + if (changes > 0) { + mutableProDetailsChangeNotification.tryEmit(Unit) + } } @Serializable