diff --git a/app/src/main/java/to/bitkit/repositories/ActivityRepo.kt b/app/src/main/java/to/bitkit/repositories/ActivityRepo.kt index cedb68f2b..05b743878 100644 --- a/app/src/main/java/to/bitkit/repositories/ActivityRepo.kt +++ b/app/src/main/java/to/bitkit/repositories/ActivityRepo.kt @@ -7,6 +7,8 @@ import com.synonym.bitkitcore.PaymentType import com.synonym.bitkitcore.SortDirection import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.map @@ -26,7 +28,6 @@ import to.bitkit.ext.rawId import to.bitkit.services.CoreService import to.bitkit.utils.AddressChecker import to.bitkit.utils.Logger -import java.security.InvalidParameterException import javax.inject.Inject import javax.inject.Singleton @@ -64,7 +65,7 @@ class ActivityRepo @Inject constructor( return@withContext Result.failure(e) } updateActivitiesMetadata() - syncTagsMetaData() + syncTagsMetadata() boostPendingActivities() updateInProgressTransfers() isSyncingLdkNodePayments.value = false @@ -245,6 +246,10 @@ class ActivityRepo @Inject constructor( "Activity $id updated with success. new data: $activity. Deleting activity $activityIdToDelete", context = TAG ) + + val tags = coreService.activity.tags(activityIdToDelete) + addTagsToActivity(activityId = id, tags = tags) + deleteActivity(activityIdToDelete).onFailure { e -> Logger.warn( "Failed to delete $activityIdToDelete caching to retry on next sync", @@ -267,131 +272,147 @@ class ActivityRepo @Inject constructor( } private suspend fun deletePendingActivities() = withContext(bgDispatcher) { - cacheStore.data.first().activitiesPendingDelete.forEach { activityId -> - deleteActivity(id = activityId).onSuccess { - cacheStore.removeActivityFromPendingDelete(activityId) + cacheStore.data.first().activitiesPendingDelete.map { activityId -> + async { + deleteActivity(id = activityId).onSuccess { + cacheStore.removeActivityFromPendingDelete(activityId) + } } - } + }.awaitAll() } private suspend fun updateActivitiesMetadata() = withContext(bgDispatcher) { - cacheStore.data.first().transactionsMetadata.forEach { activityMetaData -> - findActivityByPaymentId( - paymentHashOrTxId = activityMetaData.txId, - type = ActivityFilter.ALL, - txType = PaymentType.SENT - ).onSuccess { activityToUpdate -> - Logger.debug("updateActivitiesMetaData = Activity found: ${activityToUpdate.rawId()}", context = TAG) - - when (activityToUpdate) { - is Activity.Onchain -> { - val onChainActivity = activityToUpdate.v1.copy( - feeRate = activityMetaData.feeRate.toULong(), - address = activityMetaData.address, - isTransfer = activityMetaData.isTransfer, - channelId = activityMetaData.channelId, - transferTxId = activityMetaData.transferTxId - ) - val updatedActivity = Onchain( - v1 = onChainActivity - ) + cacheStore.data.first().transactionsMetadata.map { activityMetaData -> + async { + findActivityByPaymentId( + paymentHashOrTxId = activityMetaData.txId, + type = ActivityFilter.ALL, + txType = PaymentType.SENT + ).onSuccess { activityToUpdate -> + Logger.debug( + "updateActivitiesMetaData - Activity found: ${activityToUpdate.rawId()}", + context = TAG + ) - updateActivity( - id = updatedActivity.v1.id, - activity = updatedActivity - ).onSuccess { - if (onChainActivity.isTransfer && onChainActivity.doesExist) { - cacheStore.addInProgressTransfer( - InProgressTransfer( - activityId = updatedActivity.v1.id, - type = if (onChainActivity.txType == PaymentType.SENT) { - TransferType.TO_SPENDING - } else { - TransferType.TO_SAVINGS - } + when (activityToUpdate) { + is Activity.Onchain -> { + val onChainActivity = activityToUpdate.v1.copy( + feeRate = activityMetaData.feeRate.toULong(), + address = activityMetaData.address, + isTransfer = activityMetaData.isTransfer, + channelId = activityMetaData.channelId, + transferTxId = activityMetaData.transferTxId + ) + val updatedActivity = Onchain( + v1 = onChainActivity + ) + + updateActivity( + id = updatedActivity.v1.id, + activity = updatedActivity + ).onSuccess { + if (onChainActivity.isTransfer && onChainActivity.doesExist) { + cacheStore.addInProgressTransfer( + InProgressTransfer( + activityId = updatedActivity.v1.id, + type = if (onChainActivity.txType == PaymentType.SENT) { + TransferType.TO_SPENDING + } else { + TransferType.TO_SAVINGS + } + ) ) - ) + } + cacheStore.removeTransactionMetadata(activityMetaData) } - cacheStore.removeTransactionMetadata(activityMetaData) } - } - is Activity.Lightning -> Unit + is Activity.Lightning -> Unit + } } } - } + }.awaitAll() } - private suspend fun syncTagsMetaData( - ) = withContext(context = bgDispatcher) { + private suspend fun syncTagsMetadata() = withContext(context = bgDispatcher) { runCatching { if (db.tagMetadataDao().getAll().isEmpty()) return@withContext val lastActivities = getActivities(limit = 10u).getOrNull() ?: return@withContext - Logger.debug("syncTagsMetaData called") - lastActivities.forEach { activity -> - when (activity) { - is Activity.Lightning -> { - val paymentHash = activity.rawId() - db.tagMetadataDao().searchByPaymentHash(paymentHash = paymentHash)?.let { tagMetadata -> - Logger.debug("Tags metadata found! $tagMetadata", context = TAG) - addTagsToTransaction( - paymentHashOrTxId = paymentHash, - type = ActivityFilter.LIGHTNING, - txType = if (tagMetadata.isReceive) PaymentType.RECEIVED else PaymentType.SENT, - tags = tagMetadata.tags - ).onSuccess { - Logger.debug("Tags synced with success!", context = TAG) - db.tagMetadataDao().deleteByPaymentHash(paymentHash = paymentHash) + Logger.debug("syncTagsMetadata called") + + lastActivities.map { activity -> + async { + when (activity) { + is Activity.Lightning -> { + val paymentHash = activity.rawId() + db.tagMetadataDao().searchByPaymentHash(paymentHash = paymentHash)?.let { tagMetadata -> + Logger.debug("Tags metadata found! $tagMetadata", context = TAG) + addTagsToTransaction( + paymentHashOrTxId = paymentHash, + type = ActivityFilter.LIGHTNING, + txType = if (tagMetadata.isReceive) PaymentType.RECEIVED else PaymentType.SENT, + tags = tagMetadata.tags + ).onSuccess { + Logger.debug("Tags synced with success!", context = TAG) + db.tagMetadataDao().deleteByPaymentHash(paymentHash = paymentHash) + } } } - } - is Onchain -> { - when (activity.v1.txType) { - PaymentType.RECEIVED -> { - // TODO Temporary solution while whe ldk-node doesn't return the address directly - Logger.debug("Fetching data for txId: ${activity.v1.txId}", context = TAG) - runCatching { addressChecker.getTransaction(activity.v1.txId) }.onSuccess { txDetails -> - Logger.debug("Tx detail fetched with success: $txDetails", context = TAG) - txDetails.vout.forEach { vOut -> - vOut.scriptpubkey_address?.let { - Logger.debug("Extracted address: $it", context = TAG) - db.tagMetadataDao().searchByAddress(it) - }?.let { tagMetadata -> - Logger.debug("Tags metadata found! $tagMetadata", context = TAG) - addTagsToTransaction( - paymentHashOrTxId = txDetails.txid, - type = ActivityFilter.ONCHAIN, - txType = PaymentType.RECEIVED, - tags = tagMetadata.tags - ).onSuccess { - Logger.debug("Tags synced with success! $tagMetadata", context = TAG) - db.tagMetadataDao().deleteByTxId(activity.v1.txId) + is Onchain -> { + when (activity.v1.txType) { + PaymentType.RECEIVED -> { + // TODO Temporary solution while whe ldk-node doesn't return the address directly + Logger.debug("Fetching data for txId: ${activity.v1.txId}", context = TAG) + runCatching { + addressChecker.getTransaction(activity.v1.txId) + }.onSuccess { txDetails -> + Logger.debug("Tx detail fetched with success: $txDetails", context = TAG) + txDetails.vout.map { vOut -> + async { + vOut.scriptpubkey_address?.let { + Logger.debug("Extracted address: $it", context = TAG) + db.tagMetadataDao().searchByAddress(it) + }?.let { tagMetadata -> + Logger.debug("Tags metadata found! $tagMetadata", context = TAG) + addTagsToTransaction( + paymentHashOrTxId = txDetails.txid, + type = ActivityFilter.ONCHAIN, + txType = PaymentType.RECEIVED, + tags = tagMetadata.tags + ).onSuccess { + Logger.debug( + "Tags synced with success! $tagMetadata", + context = TAG + ) + db.tagMetadataDao().deleteByTxId(activity.v1.txId) + } + } } - } + }.awaitAll() + }.onFailure { + Logger.warn("Failed getting transaction detail", context = TAG) } - }.onFailure { - Logger.warn("Failed getting transaction detail", context = TAG) } - } - PaymentType.SENT -> { - db.tagMetadataDao().searchByTxId(activity.v1.txId)?.let { tagMetadata -> - addTagsToTransaction( - paymentHashOrTxId = activity.v1.txId, - type = ActivityFilter.ONCHAIN, - txType = PaymentType.SENT, - tags = tagMetadata.tags - ).onSuccess { - Logger.debug("Tags synced with success! $tagMetadata", context = TAG) - db.tagMetadataDao().deleteByTxId(activity.v1.txId) + PaymentType.SENT -> { + db.tagMetadataDao().searchByTxId(activity.v1.txId)?.let { tagMetadata -> + addTagsToTransaction( + paymentHashOrTxId = activity.v1.txId, + type = ActivityFilter.ONCHAIN, + txType = PaymentType.SENT, + tags = tagMetadata.tags + ).onSuccess { + Logger.debug("Tags synced with success! $tagMetadata", context = TAG) + db.tagMetadataDao().deleteByTxId(activity.v1.txId) + } } } } } } } - } + }.awaitAll() } } @@ -408,46 +429,48 @@ class ActivityRepo @Inject constructor( } private suspend fun boostPendingActivities() = withContext(bgDispatcher) { - cacheStore.data.first().pendingBoostActivities.forEach { pendingBoostActivity -> - findActivityByPaymentId( - paymentHashOrTxId = pendingBoostActivity.txId, - type = ActivityFilter.ONCHAIN, - txType = PaymentType.SENT - ).onSuccess { activityToUpdate -> - Logger.debug("boostPendingActivities = Activity found: ${activityToUpdate.rawId()}", context = TAG) - - val newOnChainActivity = activityToUpdate as? Activity.Onchain ?: return@onSuccess - - if ((newOnChainActivity.v1.updatedAt ?: 0u) > pendingBoostActivity.updatedAt) { - cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) - return@onSuccess - } + cacheStore.data.first().pendingBoostActivities.map { pendingBoostActivity -> + async { + findActivityByPaymentId( + paymentHashOrTxId = pendingBoostActivity.txId, + type = ActivityFilter.ONCHAIN, + txType = PaymentType.SENT + ).onSuccess { activityToUpdate -> + Logger.debug("boostPendingActivities = Activity found: ${activityToUpdate.rawId()}", context = TAG) + + val newOnChainActivity = activityToUpdate as? Activity.Onchain ?: return@onSuccess + + if ((newOnChainActivity.v1.updatedAt ?: 0u) > pendingBoostActivity.updatedAt) { + cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) + return@onSuccess + } - val updatedActivity = Activity.Onchain( - v1 = newOnChainActivity.v1.copy( - isBoosted = true, - updatedAt = pendingBoostActivity.updatedAt + val updatedActivity = Activity.Onchain( + v1 = newOnChainActivity.v1.copy( + isBoosted = true, + updatedAt = pendingBoostActivity.updatedAt + ) ) - ) - if (pendingBoostActivity.activityToDelete != null) { - replaceActivity( - id = updatedActivity.v1.id, - activity = updatedActivity, - activityIdToDelete = pendingBoostActivity.activityToDelete - ).onSuccess { - cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) - } - } else { - updateActivity( - id = updatedActivity.v1.id, - activity = updatedActivity - ).onSuccess { - cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) + if (pendingBoostActivity.activityToDelete != null) { + replaceActivity( + id = updatedActivity.v1.id, + activity = updatedActivity, + activityIdToDelete = pendingBoostActivity.activityToDelete + ).onSuccess { + cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) + } + } else { + updateActivity( + id = updatedActivity.v1.id, + activity = updatedActivity + ).onSuccess { + cacheStore.removeActivityFromPendingBoost(pendingBoostActivity) + } } } } - } + }.awaitAll() } /** @@ -579,7 +602,7 @@ class ActivityRepo @Inject constructor( ): Result = withContext(bgDispatcher) { return@withContext runCatching { - if (tags.isEmpty()) throw InvalidParameterException("tags must not be empty") + require(tags.isNotEmpty()) val entity = TagMetadataEntity( id = id, diff --git a/app/src/test/java/to/bitkit/repositories/ActivityRepoTest.kt b/app/src/test/java/to/bitkit/repositories/ActivityRepoTest.kt index fe7d206cf..4421aadb5 100644 --- a/app/src/test/java/to/bitkit/repositories/ActivityRepoTest.kt +++ b/app/src/test/java/to/bitkit/repositories/ActivityRepoTest.kt @@ -232,6 +232,7 @@ class ActivityRepoTest : BaseUnitTest() { fun `replaceActivity updates and deletes successfully`() = test { val activityId = "activity123" val activityToDeleteId = "activity456" + val tagsMock = listOf("tag1", "tag2") val cacheData = AppCacheData(deletedActivities = emptyList()) whenever(cacheStore.data).thenReturn(flowOf(cacheData)) @@ -239,6 +240,8 @@ class ActivityRepoTest : BaseUnitTest() { wheneverBlocking { coreService.activity.delete(activityToDeleteId) }.thenReturn(true) wheneverBlocking { cacheStore.addActivityToDeletedList(activityToDeleteId) }.thenReturn(Unit) + whenever(coreService.activity.tags(activityId)).thenAnswer { tagsMock } + val result = sut.replaceActivity(activityId, activityToDeleteId, testActivity) assertTrue(result.isSuccess)