diff --git a/Cargo.lock b/Cargo.lock index 844805d8f1..b1cc851915 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5702,7 +5702,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", "backoff", diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index 12d1d72e30..76ccf04630 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.8.0" +version = "0.8.1" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 3cba8048af..2e4352166a 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -103,20 +103,36 @@ async fn handle_jrpc_inner( match serde_json::from_slice::(receive_buf.as_slice()) { Ok(jrpc_request) => match jrpc_request.params { JrpcCall::PushUpdate(request_params) => { - handle_push_update( - sender, - lazer_publisher, - request_params, - jrpc_request.id.clone(), - ) - .await + match lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + Ok(()) => send_update_success_response(sender, jrpc_request.id).await, + Err(err) => { + send_update_failure_response(sender, request_params, jrpc_request.id, err) + .await + } + } } - JrpcCall::PushUpdates(request_params) => { - for feed in request_params { - handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone()) - .await?; + JrpcCall::PushUpdates(request_params_batch) => { + for request_params in request_params_batch { + match lazer_publisher + .push_feed_update(request_params.clone().into()) + .await + { + Ok(()) => (), + Err(err) => { + return send_update_failure_response( + sender, + request_params, + jrpc_request.id, + err, + ) + .await; + } + } } - Ok(()) + send_update_success_response(sender, jrpc_request.id).await } JrpcCall::GetMetadata(request_params) => match jrpc_request.id { JrpcId::Null => { @@ -201,37 +217,18 @@ fn filter_symbols( res } -async fn handle_push_update( +async fn send_update_success_response( sender: &mut Sender, - lazer_publisher: &LazerPublisher, - request_params: FeedUpdateParams, request_id: JrpcId, ) -> anyhow::Result<()> { - match lazer_publisher - .push_feed_update(request_params.clone().into()) - .await - { - Ok(_) => match request_id { - JrpcId::Null => Ok(()), - _ => { - send_json( - sender, - &JrpcSuccessResponse:: { - jsonrpc: JsonRpcVersion::V2, - result: "success".to_string(), - id: request_id, - }, - ) - .await - } - }, - Err(err) => { - debug!("error while sending updates: {:?}", err); + match request_id { + JrpcId::Null => Ok(()), + _ => { send_json( sender, - &JrpcErrorResponse { + &JrpcSuccessResponse:: { jsonrpc: JsonRpcVersion::V2, - error: JrpcError::SendUpdateError(request_params).into(), + result: "success".to_string(), id: request_id, }, ) @@ -240,6 +237,24 @@ async fn handle_push_update( } } +async fn send_update_failure_response( + sender: &mut Sender, + request_params: FeedUpdateParams, + request_id: JrpcId, + err: Error, +) -> anyhow::Result<()> { + debug!("error while sending updates: {:?}", err); + send_json( + sender, + &JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::SendUpdateError(request_params).into(), + id: request_id, + }, + ) + .await +} + async fn handle_get_metadata( sender: &mut Sender, config: &Config,