Skip to content

Commit

Permalink
feat(wallet): wip implementation of InMemoryTransactionTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
mkazlauskas committed Oct 11, 2021
1 parent d48107e commit 65a05ee
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 34 deletions.
132 changes: 132 additions & 0 deletions packages/wallet/src/InMemoryTransactionTracker.ts
@@ -0,0 +1,132 @@
import { TransactionTracker, TransactionTrackerEvents } from './types';
import Emittery from 'emittery';
import { Hash16, Slot, Tip } from '@cardano-ogmios/schema';
import { CardanoProvider, CardanoProviderError, CardanoSerializationLib, CSL } from '@cardano-sdk/core';
import { TransactionError, TransactionFailure } from './Transaction/TransactionError';
import { Logger } from 'ts-log';

export type Milliseconds = number;

export interface InMemoryTransactionTrackerProps {
provider: CardanoProvider;
csl: CardanoSerializationLib;
logger: Logger;
maxTipFailures?: number;
pollInterval?: Milliseconds;
}

const delay = (time: Milliseconds) => new Promise<void>((resolve) => setTimeout(resolve, time));

export class InMemoryTransactionTracker extends Emittery<TransactionTrackerEvents> implements TransactionTracker {
readonly #provider: CardanoProvider;
readonly #pendingTransactions = new Map<string, Promise<void>>();
readonly #csl: CardanoSerializationLib;
readonly #logger: Logger;
readonly #maxTipFailures: number;
readonly #pollInterval: number;

constructor({ provider, csl, logger, maxTipFailures = 3, pollInterval = 2000 }: InMemoryTransactionTrackerProps) {
super();
this.#provider = provider;
this.#csl = csl;
this.#logger = logger;
this.#maxTipFailures = maxTipFailures;
this.#pollInterval = pollInterval;
}

async trackTransaction(transaction: CSL.Transaction): Promise<void> {
const body = transaction.body();
const hash = Buffer.from(this.#csl.hash_transaction(body).to_bytes()).toString('hex');

if (this.#pendingTransactions.has(hash)) {
return this.#pendingTransactions.get(hash)!;
}

const invalidHereafter = body.ttl();
if (!invalidHereafter) {
throw new TransactionError(TransactionFailure.CannotTrack, undefined, 'no TTL');
}

const promise = this.#trackTransaction(hash, invalidHereafter);
this.#pendingTransactions.set(hash, promise);
this.emit('transaction', { transaction, confirmed: promise });
// eslint-disable-next-line promise/catch-or-return
promise.catch(() => void 0).then(() => this.#pendingTransactions.delete(hash));

return promise;
}

async #trackTransaction(hash: Hash16, invalidHereafter: Slot, numTipFailures = 0): Promise<void> {
await delay(this.#pollInterval);
try {
const tx = await this.#provider.queryTransactionsByHashes([hash]);
if (tx.length > 0) return; // done
return this.#onTransactionNotFound(hash, invalidHereafter, numTipFailures);
} catch (error: unknown) {
const providerError = this.#formatCardanoProviderError(error);
if (providerError.status_code !== 404) {
throw new TransactionError(TransactionFailure.CannotTrack, error);
}
return this.#onTransactionNotFound(hash, invalidHereafter, numTipFailures);
}
}

async #onTransactionNotFound(hash: string, invalidHereafter: number, numTipFailures: number) {
let tip: Tip | undefined;
try {
tip = await this.#provider.ledgerTip();
} catch (error: unknown) {
if (++numTipFailures > this.#maxTipFailures) {
throw new TransactionError(
TransactionFailure.CannotTrack,
error,
"can't query tip to check for transaction timeout"
);
}
this.#logger.debug(
`Failed to query ledgerTip, transaction ${hash} might already be timed out ` +
`(attempt ${numTipFailures} out of ${this.#maxTipFailures}).`,
error
);
}
if (tip && tip.slot > invalidHereafter) {
throw new TransactionError(TransactionFailure.Timeout);
}
return this.#trackTransaction(hash, invalidHereafter, numTipFailures);
}

#formatCardanoProviderError(error: unknown) {
const cardanoProviderError = error as CardanoProviderError;
if (typeof cardanoProviderError === 'string') {
throw new TransactionError(TransactionFailure.Unknown, error, cardanoProviderError);
}
if (typeof cardanoProviderError !== 'object') {
throw new TransactionError(TransactionFailure.Unknown, error, 'failed to parse error (response type)');
}
const errorAsType1 = cardanoProviderError as {
status_code: number;
message: string;
error: string;
};
if (errorAsType1.status_code) {
return errorAsType1;
}
const errorAsType2 = cardanoProviderError as {
errno: number;
message: string;
code: string;
};
if (errorAsType2.code) {
const status_code = Number.parseInt(errorAsType2.code);
if (!status_code) {
throw new TransactionError(TransactionFailure.Unknown, error, 'failed to parse error (status code)');
}
return {
status_code,
message: errorAsType1.message,
error: errorAsType2.errno.toString()
};
}
throw new TransactionError(TransactionFailure.Unknown, error, 'failed to parse error (response json)');
}
}
15 changes: 15 additions & 0 deletions packages/wallet/src/Transaction/TransactionError.ts
@@ -0,0 +1,15 @@
import { CustomError } from 'ts-custom-error';

export enum TransactionFailure {
CannotTrack = 'CANNOT_TRACK',
Unknown = 'UNKNOWN',
Timeout = 'TIMEOUT'
}

const formatDetail = (detail?: string) => (detail ? ` (${detail})` : '');

export class TransactionError extends CustomError {
constructor(reason: TransactionFailure, public innerError?: unknown, public detail?: string) {
super(`Transaction failed: ${reason}${formatDetail(detail)}`);
}
}
1 change: 1 addition & 0 deletions packages/wallet/src/index.ts
@@ -1,6 +1,7 @@
export * as Address from './Address';
export * as Transaction from './Transaction';
export * from './InMemoryUtxoRepository';
export * from './InMemoryTransactionTracker';
export * as KeyManagement from './KeyManagement';
export * from './SingleAddressWallet';
export * from './types';
2 changes: 1 addition & 1 deletion packages/wallet/src/types.ts
Expand Up @@ -30,5 +30,5 @@ export interface TransactionTracker extends Emittery<TransactionTrackerEvents> {
/**
* Track a new transaction
*/
trackTransaction(transaction: CSL.Transaction): void;
trackTransaction(transaction: CSL.Transaction): Promise<void>;
}
136 changes: 136 additions & 0 deletions packages/wallet/test/InMemoryTransactionTracker.test.ts
@@ -0,0 +1,136 @@
import { CardanoSerializationLib, CSL } from '@cardano-sdk/core';
import { flushPromises } from '@cardano-sdk/util-dev';
import { dummyLogger } from 'ts-log';
import { InMemoryTransactionTracker } from '../src/InMemoryTransactionTracker';
import { TransactionFailure } from '../src/Transaction/TransactionError';
import { providerStub, ProviderStub, queryTransactionsResult } from './ProviderStub';

const realSetImmediate = global.setImmediate;

jest.useFakeTimers();

describe('InMemoryTransactionTracker', () => {
const POLL_INTERVAL = 1000;
const MAX_TIP_FAILURES = 2;
let ledgerTipSlot: number;
let provider: ProviderStub;
let txTracker: InMemoryTransactionTracker;
let hash_transaction: jest.Mock;

const flushRequests = async (times: number) => {
for (let i = 0; i < times; i++) {
await jest.advanceTimersByTime(POLL_INTERVAL);
await flushPromises(realSetImmediate);
}
};

const mockHashTransactionReturn = (resultHash: string) => {
hash_transaction.mockReturnValue({
to_bytes() {
return Buffer.from(resultHash);
}
});
};

beforeEach(async () => {
provider = providerStub();
provider.queryTransactionsByHashes.mockReturnValue([queryTransactionsResult[0]]);
hash_transaction = jest.fn();
mockHashTransactionReturn('some-hash');
txTracker = new InMemoryTransactionTracker({
provider,
csl: { hash_transaction } as unknown as CardanoSerializationLib,
logger: dummyLogger,
pollInterval: POLL_INTERVAL,
maxTipFailures: MAX_TIP_FAILURES
});
ledgerTipSlot = (await provider.ledgerTip()).slot;
});

describe('trackTransaction', () => {
let onTransaction: jest.Mock;

beforeEach(() => {
onTransaction = jest.fn();
txTracker.on('transaction', onTransaction);
});

it('invalid transaction (no ttl)', async () => {
await expect(() =>
txTracker.trackTransaction({
body: () => ({
ttl: () => void 0
})
} as unknown as CSL.Transaction)
).rejects.toThrowError(TransactionFailure.CannotTrack);
});

describe('valid transaction', () => {
let transaction: CSL.Transaction;

beforeEach(async () => {
transaction = {
body: () => ({
ttl: () => ledgerTipSlot
})
} as unknown as CSL.Transaction;
});

describe('ledger tip fetch error', () => {
it('Ignores up to maxTipFailures (2) tip fetch errors', async () => {
provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
provider.ledgerTip.mockRejectedValueOnce(new Error('error1'));
provider.ledgerTip.mockRejectedValueOnce(new Error('error2'));
const promise = txTracker.trackTransaction(transaction);
await flushRequests(3);
await promise;
expect(provider.ledgerTip).toBeCalledTimes(3);
expect(provider.queryTransactionsByHashes).toBeCalledTimes(3);
});
// it('Throws on >maxTipFailures (3) tip fetch errors', async () => {
// provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
// provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
// provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
// provider.ledgerTip.mockRejectedValueOnce(new Error('error1'));
// provider.ledgerTip.mockRejectedValueOnce(new Error('error2'));
// provider.ledgerTip.mockRejectedValueOnce(new Error('error3'));
// await expect(async () => {
// const promise = txTracker.trackTransaction(transaction);
// await flushRequests(3);
// return promise;
// }).rejects.toThrowError(TransactionFailure.CannotTrack);
// });
});

it('polls provider at "pollInterval" until it returns the transaction', async () => {
provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
provider.queryTransactionsByHashes.mockResolvedValueOnce([]);
const promise = txTracker.trackTransaction(transaction);
await flushRequests(3 + 1); // 3 actually needed + 1 to assert that we're no longer polling

await promise;
expect(provider.queryTransactionsByHashes).toBeCalledTimes(3);
});

it('emits "transaction" event for tracked transactions, returns promise unique per pending tx', async () => {
const promise1 = txTracker.trackTransaction(transaction);
const promise2 = txTracker.trackTransaction(transaction);
mockHashTransactionReturn('other-hash');
const promise3 = txTracker.trackTransaction(transaction);
jest.runAllTimers();
await promise1;
await promise2;
await promise3;
expect(provider.queryTransactionsByHashes).toBeCalledTimes(2);
expect(onTransaction).toBeCalledTimes(2);
// assert it clears cache
const promise4 = txTracker.trackTransaction(transaction);
jest.runAllTimers();
await promise4;
expect(provider.queryTransactionsByHashes).toBeCalledTimes(3);
expect(onTransaction).toBeCalledTimes(3);
});
});
});
});
66 changes: 33 additions & 33 deletions packages/wallet/test/ProviderStub.ts
Expand Up @@ -52,13 +52,43 @@ export const delegate = 'pool185g59xpqzt7gf0ljr8v8f3akl95qnmardf2f8auwr3ffx7atjj
export const rewards = 33_333;
export const delegationAndRewards = { delegate, rewards };

export const queryTransactionsResult = [
{
hash: 'ea1517b8c36fea3148df9aa1f49bbee66ff59a5092331a67bd8b3c427e1d79d7',
inputs: [
{
txId: 'bb217abaca60fc0ca68c1555eca6a96d2478547818ae76ce6836133f3cc546e0',
index: 0
}
],
outputs: [
{
address:
'addr_test1qpfhhfy2qgls50r9u4yh0l7z67xpg0a5rrhkmvzcuqrd0znuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q9gw0lz',
value: { coins: 5_000_000 }
},
{
address:
'addr_test1qplfzem2xsc29wxysf8wkdqrm4s4mmncd40qnjq9sk84l3tuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q52ukj5',
value: { coins: 5_000_000 }
},
{
address:
'addr_test1qqydn46r6mhge0kfpqmt36m6q43knzsd9ga32n96m89px3nuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475qypp3m9',
value: { coins: 9_825_963 }
}
]
}
];
const queryTransactions = () => jest.fn().mockResolvedValue(queryTransactionsResult);

/**
* Provider stub for testing
*
* returns CardanoProvider-compatible object
*/
export const providerStub = () => ({
ledgerTip: async () => ({
ledgerTip: jest.fn().mockResolvedValue({
blockNo: 1_111_111,
hash: '10d64cc11e9b20e15b6c46aa7b1fed11246f437e62225655a30ea47bf8cc22d0',
slot: 37_834_496
Expand Down Expand Up @@ -92,38 +122,8 @@ export const providerStub = () => ({
}
}),
utxoDelegationAndRewards: jest.fn().mockResolvedValue({ utxo, delegationAndRewards }),
queryTransactionsByAddresses: async () =>
Promise.resolve([
{
hash: 'ea1517b8c36fea3148df9aa1f49bbee66ff59a5092331a67bd8b3c427e1d79d7',
inputs: [
{
txId: 'bb217abaca60fc0ca68c1555eca6a96d2478547818ae76ce6836133f3cc546e0',
index: 0
}
],
outputs: [
{
address:
'addr_test1qpfhhfy2qgls50r9u4yh0l7z67xpg0a5rrhkmvzcuqrd0znuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q9gw0lz',
value: { coins: 5_000_000 }
},
{
address:
'addr_test1qplfzem2xsc29wxysf8wkdqrm4s4mmncd40qnjq9sk84l3tuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q52ukj5',
value: { coins: 5_000_000 }
},
{
address:
'addr_test1qqydn46r6mhge0kfpqmt36m6q43knzsd9ga32n96m89px3nuzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475qypp3m9',
value: { coins: 9_825_963 }
}
]
}
]),
queryTransactionsByHashes: async () => {
throw new Error('Not implemented yet');
},
queryTransactionsByAddresses: queryTransactions(),
queryTransactionsByHashes: queryTransactions(),
currentWalletProtocolParameters: async () => ({
minFeeCoefficient: 44,
minFeeConstant: 155_381,
Expand Down

0 comments on commit 65a05ee

Please sign in to comment.