Skip to content

Commit

Permalink
feat!: resubmit rollback transactions
Browse files Browse the repository at this point in the history
TransactionsTracker detects and emits rollbacks of completed transactions.
TransactionsReemiter keeps track of volatile transactions and rollbacks, and
reemits transactions if they still are in the validity period.
SingleAddressWallet resubmits transactions from TransactionsReemiter.

BREAKING CHANGE:
  - (web-extension) observableWalletProperties has new `transactions.rollback$` property
  - (wallet) createAddressTransactionsProvider returns an object with two observables
    `{rollback$, transactionsSource$}`, instead of only the transactionsSource$ observable
  - (wallet) TransactionsTracker interface contains new `rollback$` property
  - (wallet) TransactionsTracker interface `$confirmed` Observable emits `NewTxAlonzoWithSlot`
    object instead of NewTxAlonzo
  • Loading branch information
mirceahasegan committed Aug 3, 2022
1 parent cac51e7 commit 2a4ccb0
Show file tree
Hide file tree
Showing 12 changed files with 576 additions and 66 deletions.
25 changes: 25 additions & 0 deletions packages/wallet/src/SingleAddressWallet.ts
Expand Up @@ -53,14 +53,19 @@ import {
import { BehaviorObservable, TrackerSubject } from '@cardano-sdk/util-rxjs';
import {
BehaviorSubject,
EMPTY,
Subject,
Subscription,
catchError,
combineLatest,
concat,
distinctUntilChanged,
filter,
firstValueFrom,
from,
lastValueFrom,
map,
mergeMap,
take,
tap
} from 'rxjs';
Expand All @@ -73,6 +78,7 @@ import { TrackedUtxoProvider } from './services/ProviderTracker/TrackedUtxoProvi
import { TxInternals, createTransactionInternals, ensureValidityInterval } from './Transaction';
import { WalletStores, createInMemoryWalletStores } from './persistence';
import { cip30signData } from './KeyManagement/cip8';
import { createTransactionReemitter } from './services/TransactionReemitter';
import isEqual from 'lodash/isEqual';

export interface SingleAddressWalletProps {
Expand Down Expand Up @@ -105,6 +111,7 @@ export class SingleAddressWallet implements ObservableWallet {
pending$: new Subject<Cardano.NewTxAlonzo>(),
submitting$: new Subject<Cardano.NewTxAlonzo>()
};
#resubmitSubscription: Subscription;

readonly keyAgent: AsyncKeyAgent;
readonly currentEpoch$: TrackerSubject<EpochInfo>;
Expand Down Expand Up @@ -288,6 +295,23 @@ export class SingleAddressWallet implements ObservableWallet {
tip$: this.tip$,
transactionsHistoryStore: stores.transactions
});

this.#resubmitSubscription = createTransactionReemitter({
confirmed$: this.transactions.outgoing.confirmed$,
rollback$: this.transactions.rollback$,
store: stores.volatileTransactions,
submitting$: this.transactions.outgoing.submitting$,
tipSlot$: this.tip$.pipe(map((tip) => tip.slot))
})
.pipe(
mergeMap((transaction) => from(this.submitTx(transaction))),
catchError((err) => {
this.#logger.error('Failed to resubmit transaction', err);
return EMPTY;
})
)
.subscribe();

this.utxo = createUtxoTracker({
addresses$,
retryBackoffConfig,
Expand Down Expand Up @@ -408,6 +432,7 @@ export class SingleAddressWallet implements ObservableWallet {
this.#newTransactions.failedToSubmit$.complete();
this.#newTransactions.pending$.complete();
this.#newTransactions.submitting$.complete();
this.#resubmitSubscription.unsubscribe();
}

#prepareTx(props: InitializeTxProps) {
Expand Down
Expand Up @@ -12,6 +12,7 @@ import { GroupedAddress } from '../../KeyManagement';
import { InMemoryCollectionStore } from './InMemoryCollectionStore';
import { InMemoryDocumentStore } from './InMemoryDocumentStore';
import { InMemoryKeyValueStore } from './InMemoryKeyValueStore';
import { NewTxAlonzoWithSlot } from '../../services';
import { WalletStores } from '../types';

export class InMemoryTipStore extends InMemoryDocumentStore<Cardano.Tip> {}
Expand All @@ -24,6 +25,7 @@ export class InMemoryTimeSettingsStore extends InMemoryDocumentStore<TimeSetting
export class InMemoryAssetsStore extends InMemoryDocumentStore<Assets> {}
export class InMemoryAddressesStore extends InMemoryDocumentStore<GroupedAddress[]> {}
export class InMemoryInFlightTransactionsStore extends InMemoryDocumentStore<Cardano.NewTxAlonzo[]> {}
export class InMemoryVolatileTransactionsStore extends InMemoryDocumentStore<NewTxAlonzoWithSlot[]> {}

export class InMemoryTransactionsStore extends InMemoryCollectionStore<Cardano.TxAlonzo> {}
export class InMemoryUtxoStore extends InMemoryCollectionStore<Cardano.Utxo> {}
Expand Down Expand Up @@ -54,6 +56,7 @@ export const createInMemoryWalletStores = (): WalletStores => ({
this.tip.destroy(),
this.transactions.destroy(),
this.inFlightTransactions.destroy(),
this.volatileTransactions.destroy(),
this.utxo.destroy()
]).pipe(map(() => void 0));
}
Expand All @@ -72,5 +75,6 @@ export const createInMemoryWalletStores = (): WalletStores => ({
tip: new InMemoryTipStore(),
transactions: new InMemoryTransactionsStore(),
unspendableUtxo: new InMemoryUnspendableUtxoStore(),
utxo: new InMemoryUtxoStore()
utxo: new InMemoryUtxoStore(),
volatileTransactions: new InMemoryVolatileTransactionsStore()
});
Expand Up @@ -10,6 +10,7 @@ import {
import { EMPTY, combineLatest, map } from 'rxjs';
import { GroupedAddress } from '../../KeyManagement';
import { Logger } from 'ts-log';
import { NewTxAlonzoWithSlot } from '../../services';
import { PouchdbCollectionStore } from './PouchdbCollectionStore';
import { PouchdbDocumentStore } from './PouchdbDocumentStore';
import { PouchdbKeyValueStore } from './PouchdbKeyValueStore';
Expand All @@ -25,6 +26,7 @@ export class PouchdbTimeSettingsStore extends PouchdbDocumentStore<TimeSettings[
export class PouchdbAssetsStore extends PouchdbDocumentStore<Assets> {}
export class PouchdbAddressesStore extends PouchdbDocumentStore<GroupedAddress[]> {}
export class PouchdbInFlightTransactionsStore extends PouchdbDocumentStore<Cardano.NewTxAlonzo[]> {}
export class PouchdbVolatileTransactionsStore extends PouchdbDocumentStore<NewTxAlonzoWithSlot[]> {}

export class PouchdbTransactionsStore extends PouchdbCollectionStore<Cardano.TxAlonzo> {}
export class PouchdbUtxoStore extends PouchdbCollectionStore<Cardano.Utxo> {}
Expand Down Expand Up @@ -56,7 +58,6 @@ export const createPouchdbWalletStores = (
return combineLatest([
destroyDocumentsDb,
this.transactions.destroy(),
this.inFlightTransactions.destroy(),
this.utxo.destroy(),
this.unspendableUtxo.destroy(),
this.rewardsHistory.destroy(),
Expand Down Expand Up @@ -90,6 +91,7 @@ export const createPouchdbWalletStores = (
logger
),
unspendableUtxo: new PouchdbUtxoStore({ dbName: `${baseDbName}UnspendableUtxo` }, logger),
utxo: new PouchdbUtxoStore({ dbName: `${baseDbName}Utxo` }, logger)
utxo: new PouchdbUtxoStore({ dbName: `${baseDbName}Utxo` }, logger),
volatileTransactions: new PouchdbVolatileTransactionsStore(docsDbName, 'volatileTransactions', logger)
};
};
2 changes: 2 additions & 0 deletions packages/wallet/src/persistence/types.ts
Expand Up @@ -8,6 +8,7 @@ import {
TimeSettings
} from '@cardano-sdk/core';
import { GroupedAddress } from '../KeyManagement';
import { NewTxAlonzoWithSlot } from '../services';
import { Observable } from 'rxjs';

export interface Destroyable {
Expand Down Expand Up @@ -81,6 +82,7 @@ export interface WalletStores extends Destroyable {
unspendableUtxo: CollectionStore<Cardano.Utxo>;
transactions: OrderedCollectionStore<Cardano.TxAlonzo>;
inFlightTransactions: DocumentStore<Cardano.NewTxAlonzo[]>;
volatileTransactions: DocumentStore<NewTxAlonzoWithSlot[]>;
rewardsHistory: KeyValueStore<Cardano.RewardAccount, EpochRewards[]>;
rewardsBalances: KeyValueStore<Cardano.RewardAccount, Cardano.Lovelace>;
stakePools: KeyValueStore<Cardano.PoolId, Cardano.StakePool>;
Expand Down
113 changes: 113 additions & 0 deletions packages/wallet/src/services/TransactionReemitter.ts
@@ -0,0 +1,113 @@
import { Cardano } from '@cardano-sdk/core';
import { Logger, dummyLogger } from 'ts-log';
import { Observable, filter, from, map, merge, mergeMap, scan, withLatestFrom } from 'rxjs';

import { CustomError } from 'ts-custom-error';
import { DocumentStore } from '../persistence';
import { NewTxAlonzoWithSlot } from './types';

export enum TransactionReemitErrorCode {
invalidHereafter = 'invalidHereafter',
notFound = 'notFound'
}
class TransactionReemitError extends CustomError {
code: TransactionReemitErrorCode;
public constructor(code: TransactionReemitErrorCode, message: string) {
super(message);
this.code = code;
}
}

interface TransactionReemitterProps {
rollback$: Observable<Cardano.TxAlonzo>;
confirmed$: Observable<NewTxAlonzoWithSlot>;
submitting$: Observable<Cardano.NewTxAlonzo>;
store: DocumentStore<NewTxAlonzoWithSlot[]>;
tipSlot$: Observable<Cardano.Slot>;
stabilityWindowSlotsCount?: number;
logger?: Logger;
}

enum txSource {
store,
confirmed,
submitting
}

// 3k/f (where k is the security parameter in genesis, and f is the active slot co-efficient parameter
// in genesis that determines the probability for amount of blocks created in an epoch.)
const kStabilityWindowSlotsCount = 129_600; // 3k/f on current mainnet

export const createTransactionReemitter = ({
rollback$,
confirmed$,
submitting$,
store,
tipSlot$,
stabilityWindowSlotsCount = kStabilityWindowSlotsCount,
logger = dummyLogger
}: TransactionReemitterProps): Observable<Cardano.NewTxAlonzo> => {
const volatileTransactions$ = merge(
store.get().pipe(
mergeMap((txs) => from(txs)),
map((tx) => ({ source: txSource.store, tx }))
),
confirmed$.pipe(map((tx) => ({ source: txSource.confirmed, tx }))),
submitting$.pipe(map((tx) => ({ source: txSource.submitting, tx: { ...tx, slot: null! } })))
).pipe(
scan((volatiles, { tx, source }) => {
switch (source) {
case txSource.store: {
// Do not calculate stability window for old transactions coming from the store
volatiles = [...volatiles, tx];
break;
}
case txSource.submitting: {
// Transactions in submitting are the ones reemitted. Remove them from volatiles
volatiles = volatiles.filter((v) => v.id !== tx.id);
store.set(volatiles);
break;
}
case txSource.confirmed: {
const oldestAcceptedSlot = tx.slot > stabilityWindowSlotsCount ? tx.slot - stabilityWindowSlotsCount : 0;
// Remove transactions considered stable
volatiles = [...volatiles.filter(({ slot }) => slot > oldestAcceptedSlot), tx];
store.set(volatiles);
break;
}
}
return volatiles;
}, [] as NewTxAlonzoWithSlot[])
);

return rollback$.pipe(
withLatestFrom(tipSlot$),
map(([tx, tipSlot]) => {
const invalidHereafter = tx.body?.validityInterval?.invalidHereafter;
if (invalidHereafter && tipSlot > invalidHereafter) {
const err = new TransactionReemitError(
TransactionReemitErrorCode.invalidHereafter,
`Rolled back transaction with id ${tx.id} is no longer valid`
);
logger.error(err.message, err.code);
return;
}
return tx;
}),
filter((tx) => !!tx),
withLatestFrom(volatileTransactions$),
map(([tx, volatiles]) => {
// Get the confirmed NewTxAlonzo transaction to be retried
const reemitTx = volatiles.find((txVolatile) => txVolatile.id === tx!.id);
if (!reemitTx) {
const err = new TransactionReemitError(
TransactionReemitErrorCode.notFound,
`Could not find confirmed transaction with id ${tx!.id} that was rolled back`
);
logger.error(err.message, err.code);
}
return reemitTx!;
}),
filter((tx) => !!tx)
);
};

0 comments on commit 2a4ccb0

Please sign in to comment.