diff --git a/packages/wallet/src/SingleAddressWallet.ts b/packages/wallet/src/SingleAddressWallet.ts index af681a45ed1..ebb6932018c 100644 --- a/packages/wallet/src/SingleAddressWallet.ts +++ b/packages/wallet/src/SingleAddressWallet.ts @@ -53,14 +53,19 @@ import { import { BehaviorObservable, TrackerSubject } from '@cardano-sdk/util-rxjs'; import { BehaviorSubject, + EMPTY, Subject, + Subscription, + catchError, combineLatest, concat, distinctUntilChanged, filter, firstValueFrom, + from, lastValueFrom, map, + mergeMap, take, tap } from 'rxjs'; @@ -73,6 +78,7 @@ import { TrackedUtxoProvider } from './services/ProviderTracker/TrackedUtxoProvi import { TxInternals, createTransactionInternals, ensureValidityInterval } from './Transaction'; import { WalletStores, createInMemoryWalletStores } from './persistence'; import { cip30signData } from './KeyManagement/cip8'; +import { createTransactionReemitter } from './services/TransactionReemitter'; import isEqual from 'lodash/isEqual'; export interface SingleAddressWalletProps { @@ -105,6 +111,7 @@ export class SingleAddressWallet implements ObservableWallet { pending$: new Subject(), submitting$: new Subject() }; + #resubmitSubscription: Subscription; readonly keyAgent: AsyncKeyAgent; readonly currentEpoch$: TrackerSubject; @@ -288,6 +295,23 @@ export class SingleAddressWallet implements ObservableWallet { tip$: this.tip$, transactionsHistoryStore: stores.transactions }); + + this.#resubmitSubscription = createTransactionReemitter({ + confirmed$: this.transactions.outgoing.confirmed$, + rollback$: this.transactions.rollback$, + store: stores.volatileTransactions, + submitting$: this.transactions.outgoing.submitting$, + tipSlot$: this.tip$.pipe(map((tip) => tip.slot)) + }) + .pipe( + mergeMap((transaction) => from(this.submitTx(transaction))), + catchError((err) => { + this.#logger.error('Failed to resubmit transaction', err); + return EMPTY; + }) + ) + .subscribe(); + this.utxo = createUtxoTracker({ addresses$, retryBackoffConfig, @@ -408,6 +432,7 @@ export class SingleAddressWallet implements ObservableWallet { this.#newTransactions.failedToSubmit$.complete(); this.#newTransactions.pending$.complete(); this.#newTransactions.submitting$.complete(); + this.#resubmitSubscription.unsubscribe(); } #prepareTx(props: InitializeTxProps) { diff --git a/packages/wallet/src/persistence/inMemoryStores/inMemoryWalletStores.ts b/packages/wallet/src/persistence/inMemoryStores/inMemoryWalletStores.ts index 84703abfb6e..2416e63fe17 100644 --- a/packages/wallet/src/persistence/inMemoryStores/inMemoryWalletStores.ts +++ b/packages/wallet/src/persistence/inMemoryStores/inMemoryWalletStores.ts @@ -12,6 +12,7 @@ import { GroupedAddress } from '../../KeyManagement'; import { InMemoryCollectionStore } from './InMemoryCollectionStore'; import { InMemoryDocumentStore } from './InMemoryDocumentStore'; import { InMemoryKeyValueStore } from './InMemoryKeyValueStore'; +import { NewTxAlonzoWithSlot } from '../../services'; import { WalletStores } from '../types'; export class InMemoryTipStore extends InMemoryDocumentStore {} @@ -24,6 +25,7 @@ export class InMemoryTimeSettingsStore extends InMemoryDocumentStore {} export class InMemoryAddressesStore extends InMemoryDocumentStore {} export class InMemoryInFlightTransactionsStore extends InMemoryDocumentStore {} +export class InMemoryVolatileTransactionsStore extends InMemoryDocumentStore {} export class InMemoryTransactionsStore extends InMemoryCollectionStore {} export class InMemoryUtxoStore extends InMemoryCollectionStore {} @@ -54,6 +56,7 @@ export const createInMemoryWalletStores = (): WalletStores => ({ this.tip.destroy(), this.transactions.destroy(), this.inFlightTransactions.destroy(), + this.volatileTransactions.destroy(), this.utxo.destroy() ]).pipe(map(() => void 0)); } @@ -72,5 +75,6 @@ export const createInMemoryWalletStores = (): WalletStores => ({ tip: new InMemoryTipStore(), transactions: new InMemoryTransactionsStore(), unspendableUtxo: new InMemoryUnspendableUtxoStore(), - utxo: new InMemoryUtxoStore() + utxo: new InMemoryUtxoStore(), + volatileTransactions: new InMemoryVolatileTransactionsStore() }); diff --git a/packages/wallet/src/persistence/pouchdbStores/pouchdbWalletStores.ts b/packages/wallet/src/persistence/pouchdbStores/pouchdbWalletStores.ts index 3f72dece593..24c6340981a 100644 --- a/packages/wallet/src/persistence/pouchdbStores/pouchdbWalletStores.ts +++ b/packages/wallet/src/persistence/pouchdbStores/pouchdbWalletStores.ts @@ -10,6 +10,7 @@ import { import { EMPTY, combineLatest, map } from 'rxjs'; import { GroupedAddress } from '../../KeyManagement'; import { Logger } from 'ts-log'; +import { NewTxAlonzoWithSlot } from '../../services'; import { PouchdbCollectionStore } from './PouchdbCollectionStore'; import { PouchdbDocumentStore } from './PouchdbDocumentStore'; import { PouchdbKeyValueStore } from './PouchdbKeyValueStore'; @@ -25,6 +26,7 @@ export class PouchdbTimeSettingsStore extends PouchdbDocumentStore {} export class PouchdbAddressesStore extends PouchdbDocumentStore {} export class PouchdbInFlightTransactionsStore extends PouchdbDocumentStore {} +export class PouchdbVolatileTransactionsStore extends PouchdbDocumentStore {} export class PouchdbTransactionsStore extends PouchdbCollectionStore {} export class PouchdbUtxoStore extends PouchdbCollectionStore {} @@ -56,7 +58,6 @@ export const createPouchdbWalletStores = ( return combineLatest([ destroyDocumentsDb, this.transactions.destroy(), - this.inFlightTransactions.destroy(), this.utxo.destroy(), this.unspendableUtxo.destroy(), this.rewardsHistory.destroy(), @@ -90,6 +91,7 @@ export const createPouchdbWalletStores = ( logger ), unspendableUtxo: new PouchdbUtxoStore({ dbName: `${baseDbName}UnspendableUtxo` }, logger), - utxo: new PouchdbUtxoStore({ dbName: `${baseDbName}Utxo` }, logger) + utxo: new PouchdbUtxoStore({ dbName: `${baseDbName}Utxo` }, logger), + volatileTransactions: new PouchdbVolatileTransactionsStore(docsDbName, 'volatileTransactions', logger) }; }; diff --git a/packages/wallet/src/persistence/types.ts b/packages/wallet/src/persistence/types.ts index 40be8c773a0..c18718ada65 100644 --- a/packages/wallet/src/persistence/types.ts +++ b/packages/wallet/src/persistence/types.ts @@ -8,6 +8,7 @@ import { TimeSettings } from '@cardano-sdk/core'; import { GroupedAddress } from '../KeyManagement'; +import { NewTxAlonzoWithSlot } from '../services'; import { Observable } from 'rxjs'; export interface Destroyable { @@ -81,6 +82,7 @@ export interface WalletStores extends Destroyable { unspendableUtxo: CollectionStore; transactions: OrderedCollectionStore; inFlightTransactions: DocumentStore; + volatileTransactions: DocumentStore; rewardsHistory: KeyValueStore; rewardsBalances: KeyValueStore; stakePools: KeyValueStore; diff --git a/packages/wallet/src/services/TransactionReemitter.ts b/packages/wallet/src/services/TransactionReemitter.ts new file mode 100644 index 00000000000..3941d21e23d --- /dev/null +++ b/packages/wallet/src/services/TransactionReemitter.ts @@ -0,0 +1,113 @@ +import { Cardano } from '@cardano-sdk/core'; +import { Logger, dummyLogger } from 'ts-log'; +import { Observable, filter, from, map, merge, mergeMap, scan, withLatestFrom } from 'rxjs'; + +import { CustomError } from 'ts-custom-error'; +import { DocumentStore } from '../persistence'; +import { NewTxAlonzoWithSlot } from './types'; + +export enum TransactionReemitErrorCode { + invalidHereafter = 'invalidHereafter', + notFound = 'notFound' +} +class TransactionReemitError extends CustomError { + code: TransactionReemitErrorCode; + public constructor(code: TransactionReemitErrorCode, message: string) { + super(message); + this.code = code; + } +} + +interface TransactionReemitterProps { + rollback$: Observable; + confirmed$: Observable; + submitting$: Observable; + store: DocumentStore; + tipSlot$: Observable; + stabilityWindowSlotsCount?: number; + logger?: Logger; +} + +enum txSource { + store, + confirmed, + submitting +} + +// 3k/f (where k is the security parameter in genesis, and f is the active slot co-efficient parameter +// in genesis that determines the probability for amount of blocks created in an epoch.) +const kStabilityWindowSlotsCount = 129_600; // 3k/f on current mainnet + +export const createTransactionReemitter = ({ + rollback$, + confirmed$, + submitting$, + store, + tipSlot$, + stabilityWindowSlotsCount = kStabilityWindowSlotsCount, + logger = dummyLogger +}: TransactionReemitterProps): Observable => { + const volatileTransactions$ = merge( + store.get().pipe( + mergeMap((txs) => from(txs)), + map((tx) => ({ source: txSource.store, tx })) + ), + confirmed$.pipe(map((tx) => ({ source: txSource.confirmed, tx }))), + submitting$.pipe(map((tx) => ({ source: txSource.submitting, tx: { ...tx, slot: null! } }))) + ).pipe( + scan((volatiles, { tx, source }) => { + switch (source) { + case txSource.store: { + // Do not calculate stability window for old transactions coming from the store + volatiles = [...volatiles, tx]; + break; + } + case txSource.submitting: { + // Transactions in submitting are the ones reemitted. Remove them from volatiles + volatiles = volatiles.filter((v) => v.id !== tx.id); + store.set(volatiles); + break; + } + case txSource.confirmed: { + const oldestAcceptedSlot = tx.slot > stabilityWindowSlotsCount ? tx.slot - stabilityWindowSlotsCount : 0; + // Remove transactions considered stable + volatiles = [...volatiles.filter(({ slot }) => slot > oldestAcceptedSlot), tx]; + store.set(volatiles); + break; + } + } + return volatiles; + }, [] as NewTxAlonzoWithSlot[]) + ); + + return rollback$.pipe( + withLatestFrom(tipSlot$), + map(([tx, tipSlot]) => { + const invalidHereafter = tx.body?.validityInterval?.invalidHereafter; + if (invalidHereafter && tipSlot > invalidHereafter) { + const err = new TransactionReemitError( + TransactionReemitErrorCode.invalidHereafter, + `Rolled back transaction with id ${tx.id} is no longer valid` + ); + logger.error(err.message, err.code); + return; + } + return tx; + }), + filter((tx) => !!tx), + withLatestFrom(volatileTransactions$), + map(([tx, volatiles]) => { + // Get the confirmed NewTxAlonzo transaction to be retried + const reemitTx = volatiles.find((txVolatile) => txVolatile.id === tx!.id); + if (!reemitTx) { + const err = new TransactionReemitError( + TransactionReemitErrorCode.notFound, + `Could not find confirmed transaction with id ${tx!.id} that was rolled back` + ); + logger.error(err.message, err.code); + } + return reemitTx!; + }), + filter((tx) => !!tx) + ); +}; diff --git a/packages/wallet/src/services/TransactionsTracker.ts b/packages/wallet/src/services/TransactionsTracker.ts index 9f72fb34c29..c61908dcbe5 100644 --- a/packages/wallet/src/services/TransactionsTracker.ts +++ b/packages/wallet/src/services/TransactionsTracker.ts @@ -23,7 +23,7 @@ import { takeUntil, tap } from 'rxjs'; -import { FailedTx, TransactionFailure, TransactionsTracker } from './types'; +import { FailedTx, NewTxAlonzoWithSlot, TransactionFailure, TransactionsTracker } from './types'; import { RetryBackoffConfig } from 'backoff-rxjs'; import { Shutdown } from '@cardano-sdk/util'; import { TrackerSubject } from '@cardano-sdk/util-rxjs'; @@ -47,7 +47,8 @@ export interface TransactionsTrackerProps { } export interface TransactionsTrackerInternals { - transactionsSource$?: Observable; + transactionsSource$: Observable; + rollback$: Observable; } export const createAddressTransactionsProvider = ( @@ -56,48 +57,58 @@ export const createAddressTransactionsProvider = ( retryBackoffConfig: RetryBackoffConfig, tipBlockHeight$: Observable, store: OrderedCollectionStore -): Observable => { +): TransactionsTrackerInternals => { + const rollback$ = new Subject(); const storedTransactions$ = store.getAll().pipe(share()); - return concat( - storedTransactions$, - combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.TxAlonzo[]))]).pipe( - switchMap(([addresses, storedTransactions]) => { - let localTransactions = [...storedTransactions]; - return coldObservableProvider({ - // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first - // It should also help when using poor internet connection. - // Caveat is that local transactions might get out of date... - combinator: exhaustMap, - equals: transactionsEquals, - provider: async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - const lastStoredTransaction: Cardano.TxAlonzo | undefined = - localTransactions[localTransactions.length - 1]; - const newTransactions = await chainHistoryProvider.transactionsByAddresses({ - addresses, - sinceBlock: lastStoredTransaction?.blockHeader.blockNo - }); - const duplicateTransactions = - lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id); - if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) { - // Rollback by 1 block, try again in next loop iteration - localTransactions = localTransactions.filter( - ({ blockHeader: { blockNo } }) => blockNo < lastStoredTransaction.blockHeader.blockNo - ); - } else { - localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id); - store.setAll(localTransactions); - return localTransactions; + return { + rollback$: rollback$.asObservable(), + transactionsSource$: concat( + storedTransactions$, + combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.TxAlonzo[]))]).pipe( + switchMap(([addresses, storedTransactions]) => { + let localTransactions: Cardano.TxAlonzo[] = [...storedTransactions]; + return coldObservableProvider({ + // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first + // It should also help when using poor internet connection. + // Caveat is that local transactions might get out of date... + combinator: exhaustMap, + equals: transactionsEquals, + provider: async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + const lastStoredTransaction: Cardano.TxAlonzo | undefined = + localTransactions[localTransactions.length - 1]; + const newTransactions = await chainHistoryProvider.transactionsByAddresses({ + addresses, + sinceBlock: lastStoredTransaction?.blockHeader.blockNo + }); + const duplicateTransactions = + lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id); + if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) { + from( + localTransactions.filter( + ({ blockHeader: { blockNo } }) => blockNo >= lastStoredTransaction.blockHeader.blockNo + ) + ).subscribe(rollback$); + + // Rollback by 1 block, try again in next loop iteration + localTransactions = localTransactions.filter( + ({ blockHeader: { blockNo } }) => blockNo < lastStoredTransaction.blockHeader.blockNo + ); + } else { + localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id); + store.setAll(localTransactions); + return localTransactions; + } } - } - }, - retryBackoffConfig, - trigger$: tipBlockHeight$ - }); - }) + }, + retryBackoffConfig, + trigger$: tipBlockHeight$ + }); + }) + ) ) - ); + }; }; const createHistoricalTransactionsTrackerSubject = ( @@ -139,29 +150,27 @@ export const createTransactionsTracker = ( transactionsHistoryStore: transactionsStore, inFlightTransactionsStore: newTransactionsStore }: TransactionsTrackerProps, - { - transactionsSource$ = new TrackerSubject( - createAddressTransactionsProvider( - chainHistoryProvider, - addresses$, - retryBackoffConfig, - distinctBlock(tip$), - transactionsStore - ) - ) - }: TransactionsTrackerInternals = {} + { transactionsSource$: txSource$, rollback$ }: TransactionsTrackerInternals = createAddressTransactionsProvider( + chainHistoryProvider, + addresses$, + retryBackoffConfig, + distinctBlock(tip$), + transactionsStore + ) ): TransactionsTracker & Shutdown => { const submitting$ = merge( newTransactionsStore.get().pipe(mergeMap((transactions) => from(transactions))), newSubmitting$ ).pipe(share()); + const transactionsSource$ = new TrackerSubject(txSource$); + const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$); - const txConfirmed$ = (tx: Cardano.NewTxAlonzo) => + const txConfirmed$ = (tx: Cardano.NewTxAlonzo): Observable => newTransactions$(historicalTransactions$).pipe( filter((historyTx) => historyTx.id === tx.id), take(1), - map(() => tx) + map((historyTx) => ({ ...tx, slot: historyTx.blockHeader.slot })) ); const failed$ = new Subject(); @@ -212,7 +221,7 @@ export const createTransactionsTracker = ( ) ); - const confirmed$ = new Subject(); + const confirmed$ = new Subject(); const confirmedSubscription = submitting$ .pipe(mergeMap((tx) => txConfirmed$(tx).pipe(takeUntil(txFailed$(tx))))) .subscribe(confirmed$); @@ -226,6 +235,7 @@ export const createTransactionsTracker = ( pending$, submitting$ }, + rollback$, shutdown: () => { inFlight$.complete(); confirmedSubscription.unsubscribe(); diff --git a/packages/wallet/src/services/index.ts b/packages/wallet/src/services/index.ts index d82db53c891..9cc80fdb55d 100644 --- a/packages/wallet/src/services/index.ts +++ b/packages/wallet/src/services/index.ts @@ -9,3 +9,4 @@ export * from './ProviderTracker'; export * from './WalletUtil'; export * from './EpochTracker'; export * from './TipTracker'; +export * from './TransactionReemitter'; diff --git a/packages/wallet/src/services/types.ts b/packages/wallet/src/services/types.ts index 69a47824c48..fec2bba2229 100644 --- a/packages/wallet/src/services/types.ts +++ b/packages/wallet/src/services/types.ts @@ -51,14 +51,17 @@ export interface FailedTx { error?: Cardano.TxSubmissionError; } +export type NewTxAlonzoWithSlot = Cardano.NewTxAlonzo & { slot: Cardano.PartialBlockHeader['slot'] }; + export interface TransactionsTracker { readonly history$: Observable; + readonly rollback$: Observable; readonly outgoing: { readonly inFlight$: Observable; readonly submitting$: Observable; readonly pending$: Observable; readonly failed$: Observable; - readonly confirmed$: Observable; + readonly confirmed$: Observable; }; } diff --git a/packages/wallet/test/SingleAddressWallet/rollback.test.ts b/packages/wallet/test/SingleAddressWallet/rollback.test.ts new file mode 100644 index 00000000000..a22cf7d6c19 --- /dev/null +++ b/packages/wallet/test/SingleAddressWallet/rollback.test.ts @@ -0,0 +1,170 @@ +import { + Cardano, + ChainHistoryProvider, + NetworkInfoProvider, + RewardsProvider, + TxSubmitProvider, + UtxoProvider +} from '@cardano-sdk/core'; +import { createStubStakePoolProvider } from '@cardano-sdk/util-dev'; +import { filter, firstValueFrom } from 'rxjs'; + +import * as mocks from '../mocks'; +import { ConnectionStatusTracker, KeyManagement, PollingConfig, SingleAddressWallet, setupWallet } from '../../src'; +import { WalletStores, createInMemoryWalletStores } from '../../src/persistence'; +import { waitForWalletStateSettle } from '../util'; + +const name = 'Test Wallet'; +const address = mocks.utxo[0][0].address!; +const rewardAccount = mocks.rewardAccount; + +interface Providers { + rewardsProvider: RewardsProvider; + utxoProvider: UtxoProvider; + chainHistoryProvider: ChainHistoryProvider; + networkInfoProvider: NetworkInfoProvider; + connectionStatusTracker$?: ConnectionStatusTracker; + txSubmitProvider: TxSubmitProvider; +} + +const createWallet = async (stores: WalletStores, providers: Providers, pollingConfig?: PollingConfig) => { + const { wallet } = await setupWallet({ + createKeyAgent: async (dependencies) => { + const groupedAddress: KeyManagement.GroupedAddress = { + accountIndex: 0, + address, + index: 0, + networkId: Cardano.NetworkId.testnet, + rewardAccount, + type: KeyManagement.AddressType.External + }; + const asyncKeyAgent = await mocks.testAsyncKeyAgent([groupedAddress], dependencies); + asyncKeyAgent.deriveAddress = jest.fn().mockResolvedValue(groupedAddress); + return asyncKeyAgent; + }, + createWallet: async (keyAgent) => { + const { + txSubmitProvider, + rewardsProvider, + utxoProvider, + chainHistoryProvider, + networkInfoProvider, + connectionStatusTracker$ + } = providers; + const assetProvider = mocks.mockAssetProvider(); + const stakePoolProvider = createStubStakePoolProvider(); + + return new SingleAddressWallet( + { name, polling: pollingConfig }, + { + assetProvider, + chainHistoryProvider, + connectionStatusTracker$, + keyAgent, + networkInfoProvider, + rewardsProvider, + stakePoolProvider, + stores, + txSubmitProvider, + utxoProvider + } + ); + } + }); + return wallet; +}; + +const txOut: Cardano.TxOut = { + address: Cardano.Address( + 'addr_test1qz2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3jcu5d8ps7zex2k2xt3uqxgjqnnj83ws8lhrn648jjxtwq2ytjqp' + ), + value: { + coins: 10n + } +}; + +const txBody: Cardano.NewTxBodyAlonzo = { + fee: 10n, + inputs: [ + { + index: 0, + txId: Cardano.TransactionId('0f3abbc8fc19c2e61bab6059bf8a466e6e754833a08a62a6c56fe0e78f19d9d5') + } + ], + outputs: [txOut], + validityInterval: { + invalidBefore: 100, + invalidHereafter: 1000 + } +}; + +const vkey = '6199186adb51974690d7247d2646097d2c62763b767b528816fb7ed3f9f55d39'; +const signature = + // eslint-disable-next-line max-len + 'bdea87fca1b4b4df8a9b8fb4183c0fab2f8261eb6c5e4bc42c800bb9c8918755bdea87fca1b4b4df8a9b8fb4183c0fab2f8261eb6c5e4bc42c800bb9c8918755'; +const tx: Cardano.NewTxAlonzo = { + body: txBody, + id: Cardano.TransactionId('de9d33f66cffff721673219b19470aec81d96bc9253182369e41eec58389a448'), + witness: { + signatures: new Map([[Cardano.Ed25519PublicKey(vkey), Cardano.Ed25519Signature(signature)]]) + } +}; + +describe('SingleAddressWallet rollback', () => { + it('Rollback transaction is resubmitteed', async () => { + const stores = createInMemoryWalletStores(); + const rewardsProvider = mocks.mockRewardsProvider(); + const networkInfoProvider = mocks.mockNetworkInfoProvider(); + const chainHistoryProvider = mocks.mockChainHistoryProvider(); + const utxoProvider = mocks.mockUtxoProvider(); + const txSubmitProvider = mocks.mockTxSubmitProvider(); + + const secondTip = { + blockNo: 1_111_112, + hash: '10d64cc11e9b20e15b6c46aa7b1fed11246f438e62225655a30ea47bf8cc22d0', + slot: mocks.ledgerTip.slot + 1 + }; + + networkInfoProvider.ledgerTip = jest.fn().mockResolvedValueOnce(mocks.ledgerTip).mockResolvedValueOnce(secondTip); + + const histTx1 = mocks.queryTransactionsResult[0]; + const rollBackTx = { ...mocks.queryTransactionsResult[1], id: tx.id }; + rollBackTx.body.validityInterval.invalidHereafter = secondTip.slot + 1; + + const newTx = { + ...rollBackTx, + id: Cardano.TransactionId('fff4edf9712d2b619edb6ac86861fe93a730693183a262b165fcc1ba1bc99caa') + }; + + chainHistoryProvider.transactionsByAddresses + .mockResolvedValueOnce([histTx1, rollBackTx]) + .mockResolvedValueOnce([newTx]) + .mockResolvedValueOnce([histTx1, newTx]); + + stores.volatileTransactions.set([ + { + ...tx, + slot: rollBackTx.blockHeader.slot + } + ]); + + const wallet = await createWallet( + stores, + { + chainHistoryProvider, + networkInfoProvider, + rewardsProvider, + txSubmitProvider, + utxoProvider + }, + { consideredOutOfSyncAfter: 5, interval: 0 } + ); + + await firstValueFrom(wallet.transactions.history$.pipe(filter((v) => v.some(({ id }) => id === newTx.id)))); + expect(txSubmitProvider.submitTx).toHaveBeenCalled(); + + await waitForWalletStateSettle(wallet); + + wallet.shutdown(); + }); +}); diff --git a/packages/wallet/test/services/TransactionReemiter.test.ts b/packages/wallet/test/services/TransactionReemiter.test.ts new file mode 100644 index 00000000000..0b941f0fe09 --- /dev/null +++ b/packages/wallet/test/services/TransactionReemiter.test.ts @@ -0,0 +1,172 @@ +import { Cardano } from '@cardano-sdk/core'; +import { createTestScheduler } from '@cardano-sdk/util-dev'; + +import { InMemoryVolatileTransactionsStore, WalletStores } from '../../src/persistence'; +import { Logger } from 'ts-log'; +import { NewTxAlonzoWithSlot, TransactionReemitErrorCode, createTransactionReemitter } from '../../src'; + +describe('TransactionReemiter', () => { + let store: WalletStores['volatileTransactions']; + let volatileTransactions: NewTxAlonzoWithSlot[]; + + beforeEach(() => { + store = new InMemoryVolatileTransactionsStore(); + store.set = jest.fn(); + volatileTransactions = [ + { + body: { validityInterval: { invalidHereafter: 1000 } }, + id: Cardano.TransactionId('6804edf9712d2b619edb6ac86861fe93a730693183a262b165fcc1ba1bc99cad'), + slot: 100 + }, + { + body: { validityInterval: { invalidHereafter: 1000 } }, + id: Cardano.TransactionId('7804edf9712d2b619edb6ac86861fe93a730693183a262b165fcc1ba1bc99cad'), + slot: 200 + }, + { + body: { validityInterval: { invalidHereafter: 1000 } }, + id: Cardano.TransactionId('8804edf9712d2b619edb6ac86861fe93a730693183a262b165fcc1ba1bc99cad'), + slot: 300 + }, + { + body: { validityInterval: { invalidHereafter: 1000 } }, + id: Cardano.TransactionId('9904edf9712d2b619edb6ac86861fe93a730693183a262b165fcc1ba1bc99cad'), + slot: 400 + } + ] as NewTxAlonzoWithSlot[]; + }); + + it('Stored volatile transactions are fetched on init', () => { + const storeTransaction = volatileTransactions[0]; + createTestScheduler().run(({ hot, cold, expectObservable }) => { + store.get = jest.fn(() => cold('a|', { a: [storeTransaction] })); + const tipSlot$ = hot('-|'); + const confirmed$ = cold('-|'); + const rollback$ = cold('-|'); + const submitting$ = cold('-|'); + const transactionReemiter = createTransactionReemitter({ confirmed$, rollback$, store, submitting$, tipSlot$ }); + expectObservable(transactionReemiter).toBe('-|'); + }); + expect(store.get).toHaveBeenCalledTimes(1); + expect(store.set).not.toHaveBeenCalledTimes(1); // already in store + }); + + it('Merges stored transacions with confirmed transactions and adds them all to store', () => { + const storeTransaction = volatileTransactions[0]; + createTestScheduler().run(({ hot, cold, expectObservable }) => { + store.get = jest.fn(() => cold('a|', { a: [storeTransaction] })); + const tipSlot$ = hot('-|'); + const confirmed$ = cold('-bc|', { b: volatileTransactions[1], c: volatileTransactions[2] }); + const rollback$ = cold('--|'); + const submitting$ = cold('--|'); + const transactionReemiter = createTransactionReemitter({ confirmed$, rollback$, store, submitting$, tipSlot$ }); + expectObservable(transactionReemiter).toBe('--|'); + }); + expect(store.set).toHaveBeenCalledTimes(2); + expect(store.set).toHaveBeenLastCalledWith(volatileTransactions.slice(0, 3)); + }); + + it('Removes transaction from volatiles if it is reported as submitting', () => { + const storeTransaction = volatileTransactions; + createTestScheduler().run(({ hot, cold, expectObservable }) => { + store.get = jest.fn(() => cold('a|', { a: storeTransaction })); + const tipSlot$ = hot('--|'); + const confirmed$ = cold('--|'); + const rollback$ = cold('--|'); + const submitting$ = cold('-b|', { b: volatileTransactions[0] }); + const transactionReemiter = createTransactionReemitter({ confirmed$, rollback$, store, submitting$, tipSlot$ }); + expectObservable(transactionReemiter).toBe('--|'); + }); + expect(store.set).toHaveBeenCalledTimes(1); + expect(store.set).toHaveBeenLastCalledWith(volatileTransactions.slice(1)); + }); + + it('Uses stability window to remove transactions no longer volatile', () => { + const [volatileSlot100, volatileSlot200, volatileSlot300] = volatileTransactions; + createTestScheduler().run(({ hot, cold, expectObservable }) => { + const tipSlot$ = hot('---|'); + const confirmed$ = cold('abc|', { + a: volatileSlot100, + b: volatileSlot200, + c: volatileSlot300 + }); + const rollback$ = cold('---|'); + const submitting$ = cold('---|'); + const transactionReemiter = createTransactionReemitter({ + confirmed$, + rollback$, + stabilityWindowSlotsCount: 200, + store, + submitting$, + tipSlot$ + }); + expectObservable(transactionReemiter).toBe('---|'); + }); + expect(store.set).toHaveBeenCalledTimes(3); + expect(store.set).toHaveBeenLastCalledWith(volatileTransactions.slice(1, 3)); + }); + + it('Emits transactions that were rolled back and still valid', () => { + const LAST_TIP_SLOT = 400; + const [volatileA, volatileB, volatileC, volatileD] = volatileTransactions; + const rollbackA: Cardano.TxAlonzo = { body: volatileA.body, id: volatileA.id } as Cardano.TxAlonzo; + const rollbackC: Cardano.TxAlonzo = { + body: { validityInterval: { invalidHereafter: LAST_TIP_SLOT - 1 } }, + id: volatileC.id + } as Cardano.TxAlonzo; + const rollbackD: Cardano.TxAlonzo = { body: volatileD.body, id: volatileD.id } as Cardano.TxAlonzo; + + const logger = {} as Logger; + logger.error = jest.fn(); + + createTestScheduler().run(({ hot, cold, expectObservable }) => { + const tipSlot$ = hot('x--------|', { x: LAST_TIP_SLOT }); + const confirmed$ = cold('a-b-c-d--|', { + a: volatileA, + b: volatileB, + c: volatileC, + d: volatileD + }); + const rollback$ = cold('--a--c--d|', { a: rollbackA, c: rollbackC, d: rollbackD }); + const submitting$ = cold('---------|'); + const transactionReemiter = createTransactionReemitter({ + confirmed$, + logger, + rollback$, + store, + submitting$, + tipSlot$ + }); + expectObservable(transactionReemiter).toBe('--a-----d|', { a: volatileA, d: volatileD }); + }); + + expect(logger.error).toHaveBeenCalledWith(expect.anything(), TransactionReemitErrorCode.invalidHereafter); + }); + + it('Logs error message for rolledback transactions not found in volatiles', () => { + const [volatileA, volatileB, volatileC] = volatileTransactions; + const rollbackC: Cardano.TxAlonzo = { body: volatileC.body, id: volatileC.id } as Cardano.TxAlonzo; + const logger = {} as Logger; + logger.error = jest.fn(); + + createTestScheduler().run(({ hot, cold, expectObservable }) => { + const tipSlot$ = hot('x--|', { x: 300 }); + const confirmed$ = cold('ab-|', { + a: volatileA, + b: volatileB + }); + const rollback$ = cold('--c|', { c: rollbackC }); + const submitting$ = cold('---|'); + const transactionReemiter = createTransactionReemitter({ + confirmed$, + logger, + rollback$, + store, + submitting$, + tipSlot$ + }); + expectObservable(transactionReemiter).toBe('---|'); + }); + expect(logger.error).toHaveBeenCalledWith(expect.anything(), TransactionReemitErrorCode.notFound); + }); +}); diff --git a/packages/wallet/test/services/TransactionsTracker.test.ts b/packages/wallet/test/services/TransactionsTracker.test.ts index 9b338dbcf63..886db87624a 100644 --- a/packages/wallet/test/services/TransactionsTracker.test.ts +++ b/packages/wallet/test/services/TransactionsTracker.test.ts @@ -1,9 +1,9 @@ import { Cardano, ChainHistoryProvider } from '@cardano-sdk/core'; import { ChainHistoryProviderStub, mockChainHistoryProvider, queryTransactionsResult } from '../mocks'; +import { EMPTY, bufferCount, firstValueFrom, of } from 'rxjs'; import { FailedTx, TransactionFailure, createAddressTransactionsProvider, createTransactionsTracker } from '../../src'; import { InMemoryInFlightTransactionsStore, InMemoryTransactionsStore, WalletStores } from '../../src/persistence'; import { RetryBackoffConfig } from 'backoff-rxjs'; -import { bufferCount, firstValueFrom, of } from 'rxjs'; import { createTestScheduler } from '@cardano-sdk/util-dev'; import delay from 'delay'; @@ -28,7 +28,7 @@ describe('TransactionsTracker', () => { retryBackoffConfig, tip$, store - ); + ).transactionsSource$; expect(await firstValueFrom(provider$)).toEqual(queryTransactionsResult); expect(store.setAll).toBeCalledTimes(1); expect(store.setAll).toBeCalledWith(queryTransactionsResult); @@ -45,7 +45,7 @@ describe('TransactionsTracker', () => { retryBackoffConfig, tip$, store - ); + ).transactionsSource$; expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ [queryTransactionsResult[0]], queryTransactionsResult @@ -70,11 +70,12 @@ describe('TransactionsTracker', () => { retryBackoffConfig, tip$, store - ); + ).transactionsSource$; expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ queryTransactionsResult, [queryTransactionsResult[0]] ]); + // expect(await firstValueFrom(rollback$)).toEqual(queryTransactionsResult[1]); expect(store.setAll).toBeCalledTimes(2); expect(chainHistoryProvider.transactionsByAddresses).toBeCalledTimes(2); expect(chainHistoryProvider.transactionsByAddresses).nthCalledWith(1, { @@ -132,13 +133,14 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); expectObservable(transactionsTracker.outgoing.submitting$).toBe('-a--|', { a: outgoingTx }); expectObservable(transactionsTracker.outgoing.pending$).toBe('--a-|', { a: outgoingTx }); expectObservable(transactionsTracker.outgoing.confirmed$, confirmedSubscription).toBe('---a|', { - a: outgoingTx + a: { ...outgoingTx, slot: outgoingTx.blockHeader.slot } }); expectObservable(transactionsTracker.outgoing.inFlight$).toBe('ab-c|', { a: [], b: [outgoingTx], c: [] }); expectObservable(transactionsTracker.outgoing.failed$).toBe('----|'); @@ -175,6 +177,7 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); @@ -217,6 +220,7 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); @@ -261,6 +265,7 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); @@ -321,12 +326,13 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); expectObservable(transactionsTracker.outgoing.submitting$).toBe('-a--|', { a: storedInFlightTransaction }); expectObservable(transactionsTracker.outgoing.confirmed$).toBe('---a|', { - a: storedInFlightTransaction + a: { ...storedInFlightTransaction, slot: storedInFlightTransaction.blockHeader.slot } }); expectObservable(transactionsTracker.outgoing.inFlight$).toBe('ab-a|', { a: [], @@ -391,6 +397,7 @@ describe('TransactionsTracker', () => { transactionsHistoryStore: transactionsStore }, { + rollback$: EMPTY, transactionsSource$ } ); diff --git a/packages/web-extension/src/observableWallet/util.ts b/packages/web-extension/src/observableWallet/util.ts index aafe15ac62e..0a86e038bd4 100644 --- a/packages/web-extension/src/observableWallet/util.ts +++ b/packages/web-extension/src/observableWallet/util.ts @@ -46,7 +46,8 @@ export const observableWalletProperties: RemoteApiProperties = inFlight$: RemoteApiPropertyType.HotObservable, pending$: RemoteApiPropertyType.HotObservable, submitting$: RemoteApiPropertyType.HotObservable - } + }, + rollback$: RemoteApiPropertyType.HotObservable }, utxo: { available$: RemoteApiPropertyType.HotObservable,