From bbd0d9c56fb7922d3c7a6bd4ae4e01406ce3a36f Mon Sep 17 00:00:00 2001 From: Harris Luo Date: Mon, 28 Oct 2024 18:06:04 -0400 Subject: [PATCH] Add tryFetchTx method to RetryTxSender --- .../solana-v1-contrib/src/retry_tx_sender.ts | 185 +++++++++++++++++- 1 file changed, 183 insertions(+), 2 deletions(-) diff --git a/packages/solana-v1-contrib/src/retry_tx_sender.ts b/packages/solana-v1-contrib/src/retry_tx_sender.ts index 1a087a1..9a383a4 100644 --- a/packages/solana-v1-contrib/src/retry_tx_sender.ts +++ b/packages/solana-v1-contrib/src/retry_tx_sender.ts @@ -13,6 +13,7 @@ import { isNullLike, waitMS } from '@tensor-hq/ts-utils'; import bs58 from 'bs58'; import { backOff } from 'exponential-backoff'; import { getLatestBlockHeight } from './rpc'; +import { VersionedTransactionResponse } from '@solana/web3.js'; const BLOCK_TIME_MS = 400; @@ -58,6 +59,7 @@ export class RetryTxSender { private start?: number; private txSig?: TransactionSignature; private confirmedTx?: ConfirmedTx; + private fetchedTx?: VersionedTransactionResponse; readonly connection: Connection; readonly additionalConnections: Connection[]; readonly logger?: Logger; @@ -93,6 +95,12 @@ export class RetryTxSender { this.retrySleep = retrySleep; } + /** + * Send transaction to RPCs and asynchronously retry sending until + * 1. The transaction is confirmed via tryConfirm/tryFetchTx + * 2. The transaction times out + * 3. Confirmation is cancelled via cancelConfirm + */ async send( tx: Transaction | VersionedTransaction, ): Promise { @@ -149,6 +157,26 @@ export class RetryTxSender { return this.txSig; } + /** + * Confirm the status of a transaction sent by this sender by + * 1. Polling getSignatureStatus + * 2. Optionally listening for the onSignature WS event + * + * Stops polling once + * 1. The transaction is confirmed + * 2. The transaction times out (via timeout promise or lastValidBlockHeight) + * 3. Confirmation is cancelled via cancelConfirm + * + * Notes: + * * After confirming, subsequent calls will return a cached ConfirmedTx + * * tryConfirm should not be invoked multiple times in parallel + * * tryConfirm should not be invoked in parallel with tryFetchTx + * + * @param lastValidBlockHeight cancel tx confirmation loop once this block height is reached + * @param opts { + * @param disableWs don't listen for onSignature WS events when confirming + * } + */ async tryConfirm( lastValidBlockHeight?: number, opts?: ConfirmOpts, @@ -162,6 +190,7 @@ export class RetryTxSender { throw new Error('you need to send the tx first'); } + this.done = false; try { const result = await this._confirmTransaction( this.txSig, @@ -182,6 +211,56 @@ export class RetryTxSender { } } + /** + * Fetch a transaction sent by this sender by polling getTransaction. + * + * Stops polling once + * 1. The transaction is fetched + * 2. The transaction times out (via timeout promise or lastValidBlockHeight) + * 3. Confirmation is cancelled via cancelConfirm + * + * Notes: + * * After confirming, subsequent calls will return a cached tx + * * tryFetchTx should not be invoked multiple times in parallel + * * tryFetchTx should not be invoked in parallel with tryConfirm + * + * @param lastValidBlockHeight cancel tx confirmation loop once this block height is reached + * @param opts { + * @param disableWs don't listen for onSignature WS events when confirming + * } + */ + async tryFetchTx( + lastValidBlockHeight?: number, + ): Promise { + if (this.fetchedTx) { + this.logger?.info('✅ Tx already fetched'); + return this.fetchedTx; + } + + if (!this.txSig) { + throw new Error('you need to send the tx first'); + } + + this.done = false; + try { + this.fetchedTx = await this._fetchTransaction( + this.txSig, + lastValidBlockHeight, + ); + this.confirmedTx = { + txSig: this.txSig, + slot: this.fetchedTx.slot, + err: this.fetchedTx.meta?.err ?? null, + }; + return this.fetchedTx; + } catch (e) { + this.logger?.error(`${JSON.stringify(e)}`); + throw e; + } finally { + this._stopWaiting(); + } + } + cancelConfirm() { if (this.cancelReference.resolve) { this.cancelReference.resolve(); @@ -204,10 +283,11 @@ export class RetryTxSender { throw new Error('signature must be base58 encoded: ' + txSig); } - if (decodedSignature.length !== 64) + if (decodedSignature.length !== 64) { throw new Error( `signature has invalid length ${decodedSignature.length} (expected 64)`, ); + } this.start = Date.now(); const subscriptionCommitment = this.opts.commitment; @@ -223,7 +303,9 @@ export class RetryTxSender { const pollPromise = backOff( async () => { - this.logger?.debug('[getSignatureStatus] Attept to get sig status'); + this.logger?.debug( + '[getSignatureStatus] Attempt to get sig status', + ); const { value, context } = await connection.getSignatureStatus( txSig, { @@ -350,6 +432,105 @@ export class RetryTxSender { return response; } + private async _fetchTransaction( + txSig: TransactionSignature, + lastValidBlockHeight?: number, + ): Promise { + this.logger?.info(`⏳ [${txSig.substring(0, 5)}] begin trying to fetch tx`); + + let decodedSignature: Uint8Array; + try { + decodedSignature = bs58.decode(txSig); + } catch (err) { + throw new Error('signature must be base58 encoded: ' + txSig); + } + + if (decodedSignature.length !== 64) { + throw new Error( + `signature has invalid length ${decodedSignature.length} (expected 64)`, + ); + } + + this.start = Date.now(); + const connections = [this.connection, ...this.additionalConnections]; + let response: VersionedTransactionResponse | null = null; + + const promises = connections.map((connection) => + backOff( + async () => { + this.logger?.debug('[getTransaction] Attempt to get sig status'); + const maybeTx = await connection.getTransaction(txSig, { + commitment: 'confirmed', + maxSupportedTransactionVersion: 0, + }); + if (!maybeTx) { + this.logger?.debug( + `[getTransaction] tx ${txSig} not found, try again in ${this.retrySleep}ms`, + ); + throw new Error(`tx ${txSig} not found`); + } + return maybeTx; + }, + { + maxDelay: this.retrySleep, + startingDelay: this.retrySleep, + numOfAttempts: Math.ceil(this.timeout / this.retrySleep), + retry: (e) => { + if ( + typeof e.message === 'string' && + e.message.endsWith('not found') + ) { + this.logger?.info(`sig ${txSig} not found yet, retrying`); + } else { + console.error(`[getTransaction] received error, ${e} retrying`); + } + return !this.done; + }, + }, + ) + .then((res) => { + response = res; + }) + .catch((err) => { + this.logger?.error( + `[${txSig.substring(0, 5)}] error polling: ${err}`, + ); + }), + ); + + await this._racePromises( + txSig, + promises, + this.timeout, + lastValidBlockHeight, + ); + + const duration = (Date.now() - this.start) / 1000; + if (response === null) { + const errMsg = `❌ [${txSig.substring( + 0, + 5, + )}] NOT confirmed in ${duration.toFixed(2)}sec`; + this.logger?.error(errMsg); + throw new Error(errMsg); + } + + if ((response).meta?.err) { + this.logger?.warn( + `⚠️ [${txSig.substring( + 0, + 5, + )}] confirmed AS FAILED TX in ${duration.toFixed(2)}sec`, + ); + } else { + this.logger?.info( + `✅ [${txSig.substring(0, 5)}] confirmed in ${duration.toFixed(2)}sec`, + ); + } + + return response; + } + private _getTimestamp(): number { return new Date().getTime(); }