Skip to content

Commit

Permalink
Propagate errors
Browse files Browse the repository at this point in the history
  • Loading branch information
qm3ster committed Apr 3, 2019
1 parent a40cfe2 commit a18c79d
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 32 deletions.
5 changes: 3 additions & 2 deletions znp-rs/src/areq.rs
@@ -1,11 +1,12 @@
use crate::cmd::error::Result;
use crate::znp_codec::{Subsys, Type, ZpiCmd};
use bytes::{BufMut, BytesMut};
use serde::{de::DeserializeOwned, Serialize};
pub trait AreqIn: DeserializeOwned {
const SUBSYS: Subsys;
const CMD_ID: u8;
fn parse(res: ZpiCmd) -> Self {
res.parse().unwrap()
fn parse(res: ZpiCmd) -> Result<Self> {
res.parse().map_err(From::from)
}
}
pub trait AreqOut: Serialize {
Expand Down
21 changes: 18 additions & 3 deletions znp-rs/src/cmd/error.rs
@@ -1,10 +1,25 @@
use crate::znp_codec::Subsys;
use crate::znp_codec::{Subsys, ZpiCmd};
#[derive(Debug)]
pub enum Error {
Subsys(Subsys),
CmdId(u8),
Unimplemented { subsys: Subsys, cmd_id: u8 },
Mismatched { subsys: Subsys, cmd_id: u8 },
Payload(String),
}
impl Error {
pub fn unimplemented(cmd: &ZpiCmd) -> Self {
Error::Unimplemented {
subsys: cmd.subsys(),
cmd_id: cmd.cmd_id(),
}
}
pub fn mismatched(cmd: &ZpiCmd) -> Self {
Error::Mismatched {
subsys: cmd.subsys(),
cmd_id: cmd.cmd_id(),
}
}
}

impl From<crate::serde_znp::Error> for Error {
fn from(err: crate::serde_znp::Error) -> Self {
Error::Payload(err.to_string())
Expand Down
2 changes: 1 addition & 1 deletion znp-rs/src/cmd/mod.rs
Expand Up @@ -14,7 +14,7 @@ impl Areq {
use Subsys::*;
match cmd.subsys() {
SYS => Ok(Sys(sys::In::from_cmd(cmd)?)),
_ => Err(Error::Subsys(cmd.subsys())),
_ => Err(Error::mismatched(&cmd)),
}
}
}
2 changes: 1 addition & 1 deletion znp-rs/src/cmd/sys.rs
Expand Up @@ -39,7 +39,7 @@ impl In {
pub fn from_cmd(cmd: ZpiCmd) -> Result<Self> {
match cmd.cmd_id() {
TimerExpired::CMD_ID => Ok(In::TimerExpired(cmd.parse()?)),
_ => Err(Error::CmdId(cmd.cmd_id())),
_ => Err(Error::unimplemented(&cmd)),
}
}
}
5 changes: 3 additions & 2 deletions znp-rs/src/sreq.rs
@@ -1,3 +1,4 @@
use crate::cmd::error::Result;
use crate::znp_codec::{Subsys, Type, ZpiCmd};
use bytes::{BufMut, BytesMut};
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -13,7 +14,7 @@ pub trait Sreq: Serialize {
crate::serde_znp::serialize(writer, self).unwrap();
ZpiCmd::new(Type::SREQ, Self::SUBSYS, Self::CMD_ID, body)
}
fn parse_res(res: ZpiCmd) -> Self::Srsp {
res.parse().unwrap()
fn parse_res(res: ZpiCmd) -> Result<Self::Srsp> {
res.parse().map_err(From::from)
}
}
60 changes: 39 additions & 21 deletions znp-rs/src/znp.rs
@@ -1,5 +1,6 @@
use super::sreq::Sreq;
use super::znp_codec;
use crate::cmd;
use futures::lock::Mutex;
use std::path::Path;
use tokio::prelude::*;
Expand All @@ -10,6 +11,12 @@ pub struct Znp {
tx: Mutex<Option<tokio::prelude::stream::SplitSink<tokio::codec::Framed<Serial, ZnpCodec>>>>,
cbs: mpsc::Sender<oneshot::Sender<ZpiCmd>>,
}
#[derive(Debug)]
pub enum SreqError {
BadResponse(cmd::error::Error),
SerialPortGone,
IO(std::io::Error),
}
impl Znp {
pub fn from_path<P>(path: P) -> Self
where
Expand All @@ -29,22 +36,27 @@ impl Znp {
let mut cbs_rx = cbs_rx;
let mut sp_rx = sp_rx;
while let Some(frame) = await!(sp_rx.next()) {
let frame = frame.unwrap();
use znp_codec::Type::{AREQ, SRSP};
match frame.typ() {
SRSP => {
if let Ok(Async::Ready(Some(cb))) = cbs_rx.poll() {
cb.send(frame).unwrap();
} else {
eprintln!("Unexpected SRSP: {:?}", frame);
panic!("SRSP no one was waiting for");
}
}
AREQ => {
use crate::cmd::Areq;
println!("AREQ:{:?}", Areq::from_subsys(frame));
match frame {
Err(err) => {
eprintln!("{}", err);
break;
}
_ => panic!("incoming POLL or SREQ"),
Ok(frame) => match frame.typ() {
SRSP => {
if let Ok(Async::Ready(Some(cb))) = cbs_rx.poll() {
cb.send(frame).unwrap();
} else {
eprintln!("Unexpected SRSP: {:?}", frame);
panic!("SRSP no one was waiting for");
}
}
AREQ => {
use crate::cmd::Areq;
println!("AREQ:{:?}", Areq::from_subsys(frame));
}
_ => panic!("incoming POLL or SREQ"),
},
}
}
},
Expand All @@ -54,18 +66,24 @@ impl Znp {
cbs: ctx,
}
}
pub async fn sreq<S>(&mut self, req: S) -> S::Srsp
pub async fn sreq<S>(&mut self, req: S) -> Result<S::Srsp, SreqError>
where
S: Sreq + 'static,
{
// acquire writing rights
let mut tx_lock = await!(self.tx.lock());
let send = tx_lock.take().unwrap().send(req.frame());
// serial port can be gone if there has been an IO error before
let sp_tx = tx_lock.take().ok_or_else(|| SreqError::SerialPortGone)?;
let send = sp_tx.send(req.frame());
let (cb_tx, cb_rx) = oneshot::channel();
await!(self.cbs.clone().send(cb_tx)).unwrap();
let sp_tx = await!(send).unwrap();
let srsp = await!(cb_rx).unwrap();
let srsp = S::parse_res(srsp);
let register_callback = await!(self.cbs.clone().send(cb_tx));
register_callback.map_err(|_| SreqError::SerialPortGone)?;
let send_res = await!(send);
let sp_tx = send_res.map_err(|err| SreqError::IO(err))?;
let srsp = await!(cb_rx).map_err(|_| SreqError::SerialPortGone)?;
let srsp = S::parse_res(srsp)
.map_err(|err| SreqError::BadResponse(cmd::error::Error::from(err)))?;
*tx_lock = Some(sp_tx);
srsp
Ok(srsp)
}
}
6 changes: 4 additions & 2 deletions znp-rs/src/znp_codec.rs
Expand Up @@ -99,8 +99,10 @@ impl Decoder for ZnpCodec {
// Drop: FCS
frame.truncate(frame.len() - 1);
let cmd0 = frame[1];
let typ = Type::from_u8(cmd0 & 0xf0).unwrap();
let subsys = Subsys::from_u8(cmd0 & 0xf).unwrap();
let typ = Type::from_u8(cmd0 & 0xf0)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown Type"))?;
let subsys = Subsys::from_u8(cmd0 & 0xf)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown Subsystem"))?;
let cmd_id = frame[2];
// Skip: Length + Cmd0 + Cmd1
frame.advance(3);
Expand Down

0 comments on commit a18c79d

Please sign in to comment.