Skip to content

Commit

Permalink
fix: processes transactions for a tree sequentially to make sure ther…
Browse files Browse the repository at this point in the history
…e is on out of order processing. process the transaction by ops instead of queueing to TXN_FILL redis stream.
  • Loading branch information
kespinola committed May 15, 2024
1 parent bd0f4c7 commit 5721335
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 182 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions blockbuster/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub fn order_instructions<'a>(
message_instructions: &'a [CompiledInstruction],
meta_inner_instructions: &'a [InnerInstructions],
) -> VecDeque<(IxPair<'a>, Option<Vec<IxPair<'a>>>)> {
log::info!("message_instructions {:?}", message_instructions);
log::info!("meta_inner_instructions {:?}", meta_inner_instructions);
log::info!("account_keys {:?}", account_keys);

let mut ordered_ixs: VecDeque<(IxPair, Option<Vec<IxPair>>)> = VecDeque::new();

// Get inner instructions.
Expand Down
2 changes: 1 addition & 1 deletion core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct PoolArgs {
///// # Returns
/////
///// * `Result<DatabaseConnection, DbErr>` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`.
pub async fn connect_db(config: PoolArgs) -> Result<PgPool, sqlx::Error> {
pub async fn connect_db(config: &PoolArgs) -> Result<PgPool, sqlx::Error> {
let options: PgConnectOptions = config.database_url.parse()?;

PgPoolOptions::new()
Expand Down
4 changes: 2 additions & 2 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct MetricsArgs {
pub metrics_prefix: String,
}

pub fn setup_metrics(config: MetricsArgs) -> Result<()> {
let host = (config.metrics_host, config.metrics_port);
pub fn setup_metrics(config: &MetricsArgs) -> Result<()> {
let host = (config.metrics_host.clone(), config.metrics_port);

let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/plerkle_messenger_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct QueuePool {
}

impl QueuePool {
pub async fn try_from_config(config: QueueArgs) -> anyhow::Result<Self, QueuePoolError> {
pub async fn try_from_config(config: &QueueArgs) -> anyhow::Result<Self, QueuePoolError> {
let size = usize::try_from(config.messenger_queue_connections)?;
let (tx, rx) = mpsc::channel(size);

Expand Down
4 changes: 2 additions & 2 deletions core/src/solana_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub struct SolanaRpcArgs {
pub struct Rpc(Arc<RpcClient>);

impl Rpc {
pub fn from_config(config: SolanaRpcArgs) -> Self {
Rpc(Arc::new(RpcClient::new(config.solana_rpc_url)))
pub fn from_config(config: &SolanaRpcArgs) -> Self {
Rpc(Arc::new(RpcClient::new(config.solana_rpc_url.clone())))
}

pub async fn get_transaction(
Expand Down
4 changes: 4 additions & 0 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anchor-client = { workspace = true }
anyhow = { workspace = true }
backon = { workspace = true }
borsh = { workspace = true }
bs58 = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }
clap = { workspace = true, features = ["derive", "cargo", "env"] }
Expand All @@ -27,11 +28,14 @@ log = { workspace = true }
mpl-bubblegum = { workspace = true }
plerkle_messenger = { workspace = true }
plerkle_serialization = { workspace = true }
program_transformers = { workspace = true }
sea-orm = { workspace = true }
solana-account-decoder = { workspace = true }
solana-client = { workspace = true }
solana-program = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
spl-account-compression = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
4 changes: 2 additions & 2 deletions ops/src/account/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {
}

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(config.solana);
let queue = QueuePool::try_from_config(config.queue).await?;
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;

let accounts = rpc.get_program_accounts(&config.program, None).await?;

Expand Down
4 changes: 2 additions & 2 deletions ops/src/account/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {
}

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(config.solana);
let queue = QueuePool::try_from_config(config.queue).await?;
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;

let AccountDetails {
account,
Expand Down
5 changes: 3 additions & 2 deletions ops/src/bubblegum/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ pub struct Args {
pub batch_size: u64,
}

/// The `audit` commands checks `cl_audits_v2` for any failed transactions and logs them to stdout.
pub async fn run(config: Args) -> Result<()> {
let pool = connect_db(config.database).await?;
let pool = connect_db(&config.database).await?;

let solana_rpc = Rpc::from_config(config.solana);
let solana_rpc = Rpc::from_config(&config.solana);

let mut output = stdout();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
Expand Down
Loading

0 comments on commit 5721335

Please sign in to comment.