Skip to content

Commit

Permalink
feat: packet parsing is now an iterator
Browse files Browse the repository at this point in the history
Former-commit-id: 19e3c13
  • Loading branch information
dignifiedquire committed Nov 20, 2018
1 parent 2de2da5 commit 9335fbe
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 64 deletions.
7 changes: 4 additions & 3 deletions src/composed/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{Cursor, Read, Seek};

use armor::{self, BlockType};
use errors::{Error, Result};
use packet::{self, Packet};
use packet::{Packet, PacketParser};

pub trait Deserializable: Sized {
/// Parse a single byte encoded composition.
Expand Down Expand Up @@ -79,8 +79,9 @@ pub trait Deserializable: Sized {

/// Parse a list of compositions in raw byte format.
fn from_bytes_many(bytes: impl Read) -> Result<Vec<Self>> {
let packets = packet::parser(bytes)?;

let packets = PacketParser::new(bytes)
.filter(|p| p.is_ok()) // for now we are skipping any packets that we failed to parse
.map(|p| p.expect("filtered"));
Self::from_packets(packets)
}

Expand Down
132 changes: 72 additions & 60 deletions src/packet/many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,75 +7,85 @@ use errors::{Error, Result};
use packet::packet_sum::Packet;
use packet::single;

/// Parse packets, in a streaming fashion from the given reader.
pub fn parser(mut input: impl Read) -> Result<Vec<Packet>> {
// maximum size of our buffer
let max_capacity = 1024 * 1024 * 1024;
// the inital capacity of our buffer
// TODO: use a better value than a random guess
let mut capacity = 1024;
let mut b = Buffer::with_capacity(capacity);

let mut packets = Vec::new();
let mut needed: Option<Needed> = None;

let mut second_round = false;

loop {
// read some data
let sz = input.read(b.space())?;
b.fill(sz);

// if there's no more available data in the buffer after a write, that means we reached
// the end of the input
if b.available_data() == 0 {
break;
}
const MAX_CAPACITY: usize = 1024 * 1024 * 1024;

if needed.is_some() && sz == 0 {
if second_round {
// Cancel if we didn't receive enough bytes from our source, the second time around.
return Err(Error::PacketIncomplete);
}
second_round = true;
pub struct PacketParser<R> {
inner: R,
capacity: usize,
buffer: Buffer,
}

impl<R: Read> PacketParser<R> {
pub fn new(inner: R) -> Self {
PacketParser {
inner,
// the inital capacity of our buffer
// TODO: use a better value than a random guess
capacity: 1024,
buffer: Buffer::with_capacity(1024),
}
}
}

impl<R: Read> Iterator for PacketParser<R> {
type Item = Result<Packet>;

fn next(&mut self) -> Option<Self::Item> {
let b = &mut self.buffer;
let mut needed: Option<Needed> = None;
let mut second_round = false;

loop {
let length = {
match single::parser(b.data()) {
Ok((remaining, Ok(p))) => {
info!("-- parsed packet {:?} --", p.tag());
packets.push(p);
b.data().offset(remaining)
}
Ok((remaining, Err(err))) => {
warn!("parse error: {:?}", err);
// for now we are simply skipping invalid packets
b.data().offset(remaining)
}
Err(err) => match err {
Error::Incomplete(n) => {
needed = Some(n);
break;
}
_ => return Err(err),
},
// read some data
let sz = match self.inner.read(b.space()) {
Ok(sz) => sz,
Err(_) => {
return None;
}
};
b.fill(sz);

b.consume(length);
}
// If there's no more available data in the buffer after a write, that means we reached
// the end of the input.
if b.available_data() == 0 {
return None;
}

// if the parser returned `Incomplete`, and it needs more data than the buffer can hold, we grow the buffer.
if let Some(Needed::Size(sz)) = needed {
if sz > b.capacity() && capacity * 2 < max_capacity {
capacity *= 2;
b.grow(capacity);
if needed.is_some() && sz == 0 {
if second_round {
// Cancel if we didn't receive enough bytes from our source, the second time around.
return Some(Err(Error::PacketIncomplete));
}
second_round = true;
}

let res = match single::parser(b.data()).map(|(r, p)| (b.data().offset(r), p)) {
Ok((l, p)) => Some((l, p)),
Err(err) => match err {
Error::Incomplete(n) => {
needed = Some(n);
None
}
_ => return Some(Err(err)),
},
};

if let Some((length, p)) = res {
info!("got packet: {:?}", p);
b.consume(length);
return Some(p);
}

// if the parser returned `Incomplete`, and it needs more data than the buffer can hold, we grow the buffer.
if let Some(Needed::Size(sz)) = needed {
if sz > b.capacity() && self.capacity * 2 < MAX_CAPACITY {
self.capacity *= 2;
let capacity = self.capacity;
b.grow(capacity);
}
}
}
}

Ok(packets)
}

#[cfg(test)]
Expand Down Expand Up @@ -131,9 +141,11 @@ mod tests {
offset != &"38544535".to_string() // bad attribute size
});

let actual_tags = parser(file).unwrap();
for ((_offset, tag, e), packet) in expected_tags.zip(actual_tags.iter()) {
let actual_tags = PacketParser::new(file).filter(|p| p.is_ok());
for ((_offset, tag, e), packet) in expected_tags.zip(actual_tags) {
let e = e.as_ref().unwrap();
let packet = packet.unwrap();

// println!("\n-- checking: {:?} {}", packet.tag(), e);

let tag = Tag::from_u8(tag.parse().unwrap()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ pub use self::trust::*;
pub use self::user_attribute::*;
pub use self::user_id::*;

pub use self::many::parser;
pub use self::many::*;
pub use self::packet_sum::*;

0 comments on commit 9335fbe

Please sign in to comment.