From b0ebad795ea03f1fb81b645b6f4b891594c717fb Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Tue, 11 Nov 2025 10:08:18 -0600 Subject: [PATCH 1/2] fix(pyth-lazer-agent): Only send one success response per batch update --- apps/pyth-lazer-agent/src/jrpc_handle.rs | 74 ++++++++++++++++++------ 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 3cba8048af..a205bee14f 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -112,11 +112,13 @@ async fn handle_jrpc_inner( .await } JrpcCall::PushUpdates(request_params) => { - for feed in request_params { - handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone()) - .await?; - } - Ok(()) + handle_batch_push_update( + sender, + lazer_publisher, + &request_params, + jrpc_request.id.clone(), + ) + .await } JrpcCall::GetMetadata(request_params) => match jrpc_request.id { JrpcId::Null => { @@ -201,6 +203,26 @@ fn filter_symbols( res } +async fn send_update_success_response( + sender: &mut Sender, + request_id: JrpcId, +) -> anyhow::Result<()> { + match request_id { + JrpcId::Null => Ok(()), + _ => { + send_json( + sender, + &JrpcSuccessResponse:: { + jsonrpc: JsonRpcVersion::V2, + result: "success".to_string(), + id: request_id, + }, + ) + .await + } + } +} + async fn handle_push_update( sender: &mut Sender, lazer_publisher: &LazerPublisher, @@ -211,20 +233,7 @@ async fn handle_push_update( .push_feed_update(request_params.clone().into()) .await { - Ok(_) => match request_id { - JrpcId::Null => Ok(()), - _ => { - send_json( - sender, - &JrpcSuccessResponse:: { - jsonrpc: JsonRpcVersion::V2, - result: "success".to_string(), - id: request_id, - }, - ) - .await - } - }, + Ok(_) => send_update_success_response(sender, request_id).await, Err(err) => { debug!("error while sending updates: {:?}", err); send_json( @@ -240,6 +249,33 @@ async fn handle_push_update( } } +async fn handle_batch_push_update( + sender: &mut Sender, + lazer_publisher: &LazerPublisher, + batch_request: &[FeedUpdateParams], + request_id: JrpcId, +) -> anyhow::Result<()> { + for request_params in batch_request.iter() { + if let Err(err) = lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + debug!("error while sending updates: {:?}", err); + send_json( + sender, + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::SendUpdateError(request_params.clone()).into(), + id: request_id.clone(), + }, + ) + .await?; + anyhow::bail!("Error processing batch update: {:?}", err); + } + } + send_update_success_response(sender, request_id).await +} + async fn handle_get_metadata( sender: &mut Sender, config: &Config, From 4a49e26e4b622d486a6207e28b46b29bb54a7a69 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 13 Nov 2025 10:54:33 -0600 Subject: [PATCH 2/2] refactor --- Cargo.lock | 2 +- apps/pyth-lazer-agent/Cargo.toml | 2 +- apps/pyth-lazer-agent/src/jrpc_handle.rs | 103 +++++++++-------------- 3 files changed, 43 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 844805d8f1..b1cc851915 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5702,7 +5702,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", "backoff", diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index 12d1d72e30..76ccf04630 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.8.0" +version = "0.8.1" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index a205bee14f..2e4352166a 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -103,22 +103,36 @@ async fn handle_jrpc_inner( match serde_json::from_slice::(receive_buf.as_slice()) { Ok(jrpc_request) => match jrpc_request.params { JrpcCall::PushUpdate(request_params) => { - handle_push_update( - sender, - lazer_publisher, - request_params, - jrpc_request.id.clone(), - ) - .await + match lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + Ok(()) => send_update_success_response(sender, jrpc_request.id).await, + Err(err) => { + send_update_failure_response(sender, request_params, jrpc_request.id, err) + .await + } + } } - JrpcCall::PushUpdates(request_params) => { - handle_batch_push_update( - sender, - lazer_publisher, - &request_params, - jrpc_request.id.clone(), - ) - .await + JrpcCall::PushUpdates(request_params_batch) => { + for request_params in request_params_batch { + match lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + Ok(()) => (), + Err(err) => { + return send_update_failure_response( + sender, + request_params, + jrpc_request.id, + err, + ) + .await; + } + } + } + send_update_success_response(sender, jrpc_request.id).await } JrpcCall::GetMetadata(request_params) => match jrpc_request.id { JrpcId::Null => { @@ -223,57 +237,22 @@ async fn send_update_success_response( } } -async fn handle_push_update( +async fn send_update_failure_response( sender: &mut Sender, - lazer_publisher: &LazerPublisher, request_params: FeedUpdateParams, request_id: JrpcId, + err: Error, ) -> anyhow::Result<()> { - match lazer_publisher - .push_feed_update(request_params.clone().into()) - .await - { - Ok(_) => send_update_success_response(sender, request_id).await, - Err(err) => { - debug!("error while sending updates: {:?}", err); - send_json( - sender, - &JrpcErrorResponse { - jsonrpc: JsonRpcVersion::V2, - error: JrpcError::SendUpdateError(request_params).into(), - id: request_id, - }, - ) - .await - } - } -} - -async fn handle_batch_push_update( - sender: &mut Sender, - lazer_publisher: &LazerPublisher, - batch_request: &[FeedUpdateParams], - request_id: JrpcId, -) -> anyhow::Result<()> { - for request_params in batch_request.iter() { - if let Err(err) = lazer_publisher - .push_feed_update(request_params.clone().into()) - .await - { - debug!("error while sending updates: {:?}", err); - send_json( - sender, - &JrpcErrorResponse { - jsonrpc: JsonRpcVersion::V2, - error: JrpcError::SendUpdateError(request_params.clone()).into(), - id: request_id.clone(), - }, - ) - .await?; - anyhow::bail!("Error processing batch update: {:?}", err); - } - } - send_update_success_response(sender, request_id).await + debug!("error while sending updates: {:?}", err); + send_json( + sender, + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::SendUpdateError(request_params).into(), + id: request_id, + }, + ) + .await } async fn handle_get_metadata(