Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.8.0"
version = "0.8.1"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
Expand Down
89 changes: 52 additions & 37 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,36 @@ async fn handle_jrpc_inner<T: AsyncRead + AsyncWrite + Unpin>(
match serde_json::from_slice::<PythLazerAgentJrpcV1>(receive_buf.as_slice()) {
Ok(jrpc_request) => match jrpc_request.params {
JrpcCall::PushUpdate(request_params) => {
handle_push_update(
sender,
lazer_publisher,
request_params,
jrpc_request.id.clone(),
)
.await
match lazer_publisher
.push_feed_update(request_params.clone().into())
.await
{
Ok(()) => send_update_success_response(sender, jrpc_request.id).await,
Err(err) => {
send_update_failure_response(sender, request_params, jrpc_request.id, err)
.await
}
}
}
JrpcCall::PushUpdates(request_params) => {
for feed in request_params {
handle_push_update(sender, lazer_publisher, feed, jrpc_request.id.clone())
.await?;
JrpcCall::PushUpdates(request_params_batch) => {
for request_params in request_params_batch {
match lazer_publisher
.push_feed_update(request_params.clone().into())
.await
{
Ok(()) => (),
Err(err) => {
return send_update_failure_response(
sender,
request_params,
jrpc_request.id,
err,
)
.await;
}
}
}
Ok(())
send_update_success_response(sender, jrpc_request.id).await
}
JrpcCall::GetMetadata(request_params) => match jrpc_request.id {
JrpcId::Null => {
Expand Down Expand Up @@ -201,37 +217,18 @@ fn filter_symbols(
res
}

async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
async fn send_update_success_response<T: AsyncRead + AsyncWrite + Unpin>(
sender: &mut Sender<T>,
lazer_publisher: &LazerPublisher,
request_params: FeedUpdateParams,
request_id: JrpcId,
) -> anyhow::Result<()> {
match lazer_publisher
.push_feed_update(request_params.clone().into())
.await
{
Ok(_) => match request_id {
JrpcId::Null => Ok(()),
_ => {
send_json(
sender,
&JrpcSuccessResponse::<String> {
jsonrpc: JsonRpcVersion::V2,
result: "success".to_string(),
id: request_id,
},
)
.await
}
},
Err(err) => {
debug!("error while sending updates: {:?}", err);
match request_id {
JrpcId::Null => Ok(()),
_ => {
send_json(
sender,
&JrpcErrorResponse {
&JrpcSuccessResponse::<String> {
jsonrpc: JsonRpcVersion::V2,
error: JrpcError::SendUpdateError(request_params).into(),
result: "success".to_string(),
id: request_id,
},
)
Expand All @@ -240,6 +237,24 @@ async fn handle_push_update<T: AsyncRead + AsyncWrite + Unpin>(
}
}

async fn send_update_failure_response<T: AsyncRead + AsyncWrite + Unpin>(
sender: &mut Sender<T>,
request_params: FeedUpdateParams,
request_id: JrpcId,
err: Error,
) -> anyhow::Result<()> {
debug!("error while sending updates: {:?}", err);
send_json(
sender,
&JrpcErrorResponse {
jsonrpc: JsonRpcVersion::V2,
error: JrpcError::SendUpdateError(request_params).into(),
id: request_id,
},
)
.await
}

async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
sender: &mut Sender<T>,
config: &Config,
Expand Down