Skip to content

Commit

Permalink
fixup! Add IPC transport implementation for Unix
Browse files Browse the repository at this point in the history
  • Loading branch information
FelipeRosa committed Dec 15, 2020
1 parent 93d190f commit 7f34638
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions src/transports/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,35 +137,7 @@ async fn run_server(unix_stream: UnixStream, messages_rx: mpsc::UnboundedReceive
};

for output in outputs {
let id = output.id().clone();

let value = match helpers::to_result_from_output(output) {
Ok(value) => value,
Err(err) => {
log::warn!("Unable to parse output into rpc::Value: {:?}", err);
continue;
},
};

let id = match id {
rpc::Id::Num(num) => num as usize,
_ => {
log::warn!("Got unsupported response (id: {:?})", id);
continue;
},
};

let response_tx = match pending_response_txs.remove(&id) {
Some(response_tx) => response_tx,
None => {
log::warn!("Got response for unknown request (id: {:?})", id);
continue;
},
};

if let Err(err) = response_tx.send(value) {
log::warn!("Sending a response to deallocated channel: {:?}", err);
}
let _ = respond(&mut pending_response_txs, output);
}
}
Err(err) => log::warn!("Got bad IPC response bytes: {:?}", err),
Expand All @@ -183,6 +155,33 @@ async fn run_server(unix_stream: UnixStream, messages_rx: mpsc::UnboundedReceive
Ok(())
}

fn respond(
pending_response_txs: &mut BTreeMap<RequestId, oneshot::Sender<rpc::Value>>,
output: rpc::Output,
) -> std::result::Result<(), ()> {
let id = output.id().clone();

let value = helpers::to_result_from_output(output).map_err(|err| {
log::warn!("Unable to parse output into rpc::Value: {:?}", err);
})?;

let id = match id {
rpc::Id::Num(num) => num as usize,
_ => {
log::warn!("Got unsupported response (id: {:?})", id);
return Err(());
}
};

let response_tx = pending_response_txs.remove(&id).ok_or_else(|| {
log::warn!("Got response for unknown request (id: {:?})", id);
})?;

response_tx.send(value).map_err(|err| {
log::warn!("Sending a response to deallocated channel: {:?}", err);
})
}

impl From<mpsc::error::SendError<TransportMessage>> for Error {
fn from(err: mpsc::error::SendError<TransportMessage>) -> Self {
Error::Transport(format!("Send Error: {:?}", err))
Expand Down

0 comments on commit 7f34638

Please sign in to comment.