Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 183 additions & 2 deletions packages/solana-v1-contrib/src/retry_tx_sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { isNullLike, waitMS } from '@tensor-hq/ts-utils';
import bs58 from 'bs58';
import { backOff } from 'exponential-backoff';
import { getLatestBlockHeight } from './rpc';
import { VersionedTransactionResponse } from '@solana/web3.js';

const BLOCK_TIME_MS = 400;

Expand Down Expand Up @@ -58,6 +59,7 @@ export class RetryTxSender {
private start?: number;
private txSig?: TransactionSignature;
private confirmedTx?: ConfirmedTx;
private fetchedTx?: VersionedTransactionResponse;
readonly connection: Connection;
readonly additionalConnections: Connection[];
readonly logger?: Logger;
Expand Down Expand Up @@ -93,6 +95,12 @@ export class RetryTxSender {
this.retrySleep = retrySleep;
}

/**
* Send transaction to RPCs and asynchronously retry sending until
* 1. The transaction is confirmed via tryConfirm/tryFetchTx
* 2. The transaction times out
* 3. Confirmation is cancelled via cancelConfirm
*/
async send(
tx: Transaction | VersionedTransaction,
): Promise<TransactionSignature> {
Expand Down Expand Up @@ -149,6 +157,26 @@ export class RetryTxSender {
return this.txSig;
}

/**
* Confirm the status of a transaction sent by this sender by
* 1. Polling getSignatureStatus
* 2. Optionally listening for the onSignature WS event
*
* Stops polling once
* 1. The transaction is confirmed
* 2. The transaction times out (via timeout promise or lastValidBlockHeight)
* 3. Confirmation is cancelled via cancelConfirm
*
* Notes:
* * After confirming, subsequent calls will return a cached ConfirmedTx
* * tryConfirm should not be invoked multiple times in parallel
* * tryConfirm should not be invoked in parallel with tryFetchTx
*
* @param lastValidBlockHeight cancel tx confirmation loop once this block height is reached
* @param opts {
* @param disableWs don't listen for onSignature WS events when confirming
* }
*/
async tryConfirm(
lastValidBlockHeight?: number,
opts?: ConfirmOpts,
Expand All @@ -162,6 +190,7 @@ export class RetryTxSender {
throw new Error('you need to send the tx first');
}

this.done = false;
try {
const result = await this._confirmTransaction(
this.txSig,
Expand All @@ -182,6 +211,56 @@ export class RetryTxSender {
}
}

/**
* Fetch a transaction sent by this sender by polling getTransaction.
*
* Stops polling once
* 1. The transaction is fetched
* 2. The transaction times out (via timeout promise or lastValidBlockHeight)
* 3. Confirmation is cancelled via cancelConfirm
*
* Notes:
* * After confirming, subsequent calls will return a cached tx
* * tryFetchTx should not be invoked multiple times in parallel
* * tryFetchTx should not be invoked in parallel with tryConfirm
*
* @param lastValidBlockHeight cancel tx confirmation loop once this block height is reached
* @param opts {
* @param disableWs don't listen for onSignature WS events when confirming
* }
*/
async tryFetchTx(
lastValidBlockHeight?: number,
): Promise<VersionedTransactionResponse> {
if (this.fetchedTx) {
this.logger?.info('✅ Tx already fetched');
return this.fetchedTx;
}

if (!this.txSig) {
throw new Error('you need to send the tx first');
}

this.done = false;
try {
this.fetchedTx = await this._fetchTransaction(
this.txSig,
lastValidBlockHeight,
);
this.confirmedTx = {
txSig: this.txSig,
slot: this.fetchedTx.slot,
err: this.fetchedTx.meta?.err ?? null,
};
return this.fetchedTx;
} catch (e) {
this.logger?.error(`${JSON.stringify(e)}`);
throw e;
} finally {
this._stopWaiting();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this wouldn't have WS right? Did you ever figure out why WS wasn't ever really working with tryConfirm (polling almost always beat it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup this wouldn't have WS, just polling. Haven't had the chance to look into WS in tryConfirm yet, will do tn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some testing, it looks like WS confirming is reliable. Both WS and polling at 300 ms intervals are roughly equally likely to pick up a confirmed status, and turning the poll interval up from 2000 ms wasn't strictly necessary since WS would have picked it up.

Polling getTransaction at 300ms intervals appears to yield similar results to WS + 300 ms getSignatureStatus poll, with the added benefit of eliminating the additional call to fetch the actual tx. So the getTransaction approach is promising.


cancelConfirm() {
if (this.cancelReference.resolve) {
this.cancelReference.resolve();
Expand All @@ -204,10 +283,11 @@ export class RetryTxSender {
throw new Error('signature must be base58 encoded: ' + txSig);
}

if (decodedSignature.length !== 64)
if (decodedSignature.length !== 64) {
throw new Error(
`signature has invalid length ${decodedSignature.length} (expected 64)`,
);
}

this.start = Date.now();
const subscriptionCommitment = this.opts.commitment;
Expand All @@ -223,7 +303,9 @@ export class RetryTxSender {

const pollPromise = backOff(
async () => {
this.logger?.debug('[getSignatureStatus] Attept to get sig status');
this.logger?.debug(
'[getSignatureStatus] Attempt to get sig status',
);
const { value, context } = await connection.getSignatureStatus(
txSig,
{
Expand Down Expand Up @@ -350,6 +432,105 @@ export class RetryTxSender {
return response;
}

private async _fetchTransaction(
txSig: TransactionSignature,
lastValidBlockHeight?: number,
): Promise<VersionedTransactionResponse> {
this.logger?.info(`⏳ [${txSig.substring(0, 5)}] begin trying to fetch tx`);

let decodedSignature: Uint8Array;
try {
decodedSignature = bs58.decode(txSig);
} catch (err) {
throw new Error('signature must be base58 encoded: ' + txSig);
}

if (decodedSignature.length !== 64) {
throw new Error(
`signature has invalid length ${decodedSignature.length} (expected 64)`,
);
}

this.start = Date.now();
const connections = [this.connection, ...this.additionalConnections];
let response: VersionedTransactionResponse | null = null;

const promises = connections.map((connection) =>
backOff(
async () => {
this.logger?.debug('[getTransaction] Attempt to get sig status');
const maybeTx = await connection.getTransaction(txSig, {
commitment: 'confirmed',
maxSupportedTransactionVersion: 0,
});
if (!maybeTx) {
this.logger?.debug(
`[getTransaction] tx ${txSig} not found, try again in ${this.retrySleep}ms`,
);
throw new Error(`tx ${txSig} not found`);
}
return maybeTx;
},
{
maxDelay: this.retrySleep,
startingDelay: this.retrySleep,
numOfAttempts: Math.ceil(this.timeout / this.retrySleep),
retry: (e) => {
if (
typeof e.message === 'string' &&
e.message.endsWith('not found')
) {
this.logger?.info(`sig ${txSig} not found yet, retrying`);
} else {
console.error(`[getTransaction] received error, ${e} retrying`);
}
return !this.done;
},
},
)
.then((res) => {
response = res;
})
.catch((err) => {
this.logger?.error(
`[${txSig.substring(0, 5)}] error polling: ${err}`,
);
}),
);

await this._racePromises(
txSig,
promises,
this.timeout,
lastValidBlockHeight,
);

const duration = (Date.now() - this.start) / 1000;
if (response === null) {
const errMsg = `❌ [${txSig.substring(
0,
5,
)}] NOT confirmed in ${duration.toFixed(2)}sec`;
this.logger?.error(errMsg);
throw new Error(errMsg);
}

if ((<VersionedTransactionResponse>response).meta?.err) {
this.logger?.warn(
`⚠️ [${txSig.substring(
0,
5,
)}] confirmed AS FAILED TX in ${duration.toFixed(2)}sec`,
);
} else {
this.logger?.info(
`✅ [${txSig.substring(0, 5)}] confirmed in ${duration.toFixed(2)}sec`,
);
}

return response;
}

private _getTimestamp(): number {
return new Date().getTime();
}
Expand Down
Loading