Skip to content

Commit

Permalink
refactor: Merge Publish.{qos,pid} into Publish.qospid.
Browse files Browse the repository at this point in the history
This avoids the gotcha (both for mqttrs and its users) of setting AtMostOnce with a pid, or
AtLeastonce/ExactlyOnce without one.

Fixes 00imvj00#11

Breaking change.
  • Loading branch information
vincentdephily committed Oct 16, 2019
1 parent 4b529c3 commit 8a57f67
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 33 deletions.
9 changes: 6 additions & 3 deletions src/codec_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ prop_compose! {
}
prop_compose! {
fn stg_publish()(dup in bool::ANY,
qos in 0u8..3,
qos in stg_qos(),
pid in stg_pid(),
retain in bool::ANY,
topic_name in stg_topic(),
payload in vec(0u8..255u8, 1..300)) -> Packet {
Packet::Publish(Publish{dup,
qos: QoS::from_u8(qos).unwrap(),
qospid: match qos {
QoS::AtMostOnce => QosPid::AtMostOnce,
QoS::AtLeastOnce => QosPid::AtLeastOnce(pid),
QoS::ExactlyOnce => QosPid::ExactlyOnce(pid),
},
retain,
topic_name,
pid: if qos == 0 { None } else { Some(pid) },
payload})
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/decoder_test.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(unused_imports)]

use crate::{
decoder, Connack, ConnectReturnCode, Packet, PacketIdentifier, QoS, SubscribeReturnCodes,
SubscribeTopic,
decoder, Connack, ConnectReturnCode, Packet, PacketIdentifier, QoS, QosPid,
SubscribeReturnCodes, SubscribeTopic,
};
use bytes::BytesMut;

Expand Down Expand Up @@ -102,7 +102,7 @@ fn test_publish() {
Some(Packet::Publish(p)) => {
assert_eq!(p.dup, false);
assert_eq!(p.retain, false);
assert_eq!(p.qos, QoS::AtMostOnce);
assert_eq!(p.qospid, QosPid::AtMostOnce);
assert_eq!(p.topic_name, "a/b");
assert_eq!(String::from_utf8(p.payload).unwrap(), "hello");
}
Expand All @@ -112,7 +112,7 @@ fn test_publish() {
Some(Packet::Publish(p)) => {
assert_eq!(p.dup, true);
assert_eq!(p.retain, false);
assert_eq!(p.qos, QoS::AtMostOnce);
assert_eq!(p.qospid, QosPid::AtMostOnce);
assert_eq!(p.topic_name, "a/b");
assert_eq!(String::from_utf8(p.payload).unwrap(), "hello");
}
Expand All @@ -122,9 +122,8 @@ fn test_publish() {
Some(Packet::Publish(p)) => {
assert_eq!(p.dup, true);
assert_eq!(p.retain, true);
assert_eq!(p.qos, QoS::ExactlyOnce);
assert_eq!(p.qospid, QosPid::from_u8u16(2, 10).unwrap());
assert_eq!(p.topic_name, "a/b");
assert_eq!(p.pid, Some(PacketIdentifier::new(10).unwrap()));
assert_eq!(String::from_utf8(p.payload).unwrap(), "hello");
}
_ => panic!("Should not be None"),
Expand Down
5 changes: 2 additions & 3 deletions src/encoder_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[allow(unused_imports)]
use crate::{
decoder, encoder, Connack, Connect, ConnectReturnCode, Packet, PacketIdentifier, Protocol,
Publish, QoS, Suback, Subscribe, SubscribeReturnCodes, SubscribeTopic, Unsubscribe,
Publish, QoS, QosPid, Suback, Subscribe, SubscribeReturnCodes, SubscribeTopic, Unsubscribe,
};

#[allow(unused_imports)]
Expand Down Expand Up @@ -50,10 +50,9 @@ fn test_connack() {
fn test_publish() {
let packet = Publish {
dup: false,
qos: QoS::ExactlyOnce,
qospid: QosPid::from_u8u16(2, 10).unwrap(),
retain: true,
topic_name: "asdf".to_string(),
pid: Some(PacketIdentifier::new(10).unwrap()),
payload: vec!['h' as u8, 'e' as u8, 'l' as u8, 'l' as u8, 'o' as u8],
};
let mut buffer = BytesMut::with_capacity(1024);
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use crate::{
packet::Packet,
publish::Publish,
subscribe::{Suback, Subscribe, SubscribeReturnCodes, SubscribeTopic, Unsubscribe},
utils::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS},
utils::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, QosPid},
};

const MULTIPLIER: usize = 0x80 * 0x80 * 0x80 * 0x80;
Expand Down
33 changes: 18 additions & 15 deletions src/publish.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
use crate::{encoder, utils, Header, PacketIdentifier, QoS};
use crate::{encoder, utils, Header, PacketIdentifier, QoS, QosPid};
use bytes::{BufMut, BytesMut};
use std::io;

#[derive(Debug, Clone, PartialEq)]
pub struct Publish {
pub dup: bool,
pub qos: QoS,
pub qospid: QosPid,
pub retain: bool,
pub topic_name: String,
pub pid: Option<PacketIdentifier>,
pub payload: Vec<u8>,
}

impl Publish {
pub fn from_buffer(header: &Header, buffer: &mut BytesMut) -> Result<Self, io::Error> {
let topic_name = utils::read_string(buffer);

let pid = if header.qos()? == QoS::AtMostOnce {
None
} else {
Some(PacketIdentifier::from_buffer(buffer)?)
let qospid = match header.qos()? {
QoS::AtMostOnce => QosPid::AtMostOnce,
QoS::AtLeastOnce => QosPid::AtLeastOnce(PacketIdentifier::from_buffer(buffer)?),
QoS::ExactlyOnce => QosPid::ExactlyOnce(PacketIdentifier::from_buffer(buffer)?),
};

let payload = buffer.to_vec();
Ok(Publish {
dup: header.dup(),
qos: header.qos()?,
qospid,
retain: header.retain(),
topic_name,
pid,
payload,
})
}
pub fn to_buffer(&self, buffer: &mut BytesMut) -> Result<(), io::Error> {
// Header
let mut header_u8: u8 = 0b00110000 as u8;
header_u8 |= (self.qos.to_u8()) << 1;
let mut header_u8: u8 = match self.qospid {
QosPid::AtMostOnce => 0b00110000,
QosPid::AtLeastOnce(_) => 0b00110010,
QosPid::ExactlyOnce(_) => 0b00110100,
};
if self.dup {
header_u8 |= 0b00001000 as u8;
};
Expand All @@ -46,8 +47,8 @@ impl Publish {

// Length: topic (2+len) + pid (0/2) + payload (len)
let length = self.topic_name.len()
+ match self.qos {
QoS::AtMostOnce => 2,
+ match self.qospid {
QosPid::AtMostOnce => 2,
_ => 4,
}
+ self.payload.len();
Expand All @@ -57,8 +58,10 @@ impl Publish {
encoder::write_string(self.topic_name.as_ref(), buffer)?;

// Pid
if self.qos != QoS::AtMostOnce {
self.pid.unwrap().to_buffer(buffer);
match self.qospid {
QosPid::AtMostOnce => (),
QosPid::AtLeastOnce(pid) => pid.to_buffer(buffer),
QosPid::ExactlyOnce(pid) => pid.to_buffer(buffer),
}

// Payload
Expand Down
25 changes: 20 additions & 5 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ impl PacketIdentifier {
pub fn new(u: u16) -> Result<Self, io::Error> {
match NonZeroU16::new(u) {
Some(nz) => Ok(PacketIdentifier(nz)),
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Zero PacketIdentifier",
)),
None => Err(io::Error::new(io::ErrorKind::InvalidData, "Pid == 0")),
}
}
pub fn get(self) -> u16 {
Expand Down Expand Up @@ -52,7 +49,7 @@ impl QoS {
0 => Ok(QoS::AtMostOnce),
1 => Ok(QoS::AtLeastOnce),
2 => Ok(QoS::ExactlyOnce),
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "")),
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "Qos > 2")),
}
}
#[inline]
Expand All @@ -61,6 +58,24 @@ impl QoS {
}
}

pub type Pid = PacketIdentifier;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QosPid {
AtMostOnce,
AtLeastOnce(Pid),
ExactlyOnce(Pid),
}
impl QosPid {
pub fn from_u8u16(qos: u8, pid: u16) -> Result<Self, io::Error> {
match qos {
0 => Ok(QosPid::AtMostOnce),
1 => Ok(QosPid::AtLeastOnce(PacketIdentifier::new(pid)?)),
2 => Ok(QosPid::ExactlyOnce(PacketIdentifier::new(pid)?)),
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "Qos > 2")),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ConnectReturnCode {
Accepted,
Expand Down

0 comments on commit 8a57f67

Please sign in to comment.