Permalink
Browse files

Implement the Iterator/IntoIterator protocol proper (#8)

* Remove unused Serialize bound on Receiver

* Implement an iteration externally

This simplifies ownership of the receiver
  • Loading branch information...
skade authored and blt committed Feb 2, 2017
1 parent 7c0a410 commit dc82da62dce32ecc562df36aed5660fbcd67d95d
Showing with 77 additions and 33 deletions.
  1. +1 −1 README.md
  2. +2 −2 benches/mpsc_snd_rcv.rs
  3. +16 −16 src/lib.rs
  4. +53 −9 src/receiver.rs
  5. +1 −1 tests/afl_crashes.rs
  6. +4 −4 tests/exact_return.rs
View
@@ -24,7 +24,7 @@ let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
snd.send(9);
assert_eq!(Some(9), rcv.next());
assert_eq!(Some(9), rcv.iter().next());
```
The primary difference here is that you must provide a name for the channel and
View
@@ -23,7 +23,7 @@ fn bench_snd_rcv(b: &mut Bencher) {
let (mut snd, mut rcv) = hopper::channel("bench_snd", dir.path()).unwrap();
b.iter(|| {
snd.send(12u64);
rcv.next().unwrap();
rcv.iter().next().unwrap();
});
}
@@ -36,7 +36,7 @@ fn bench_all_snd_all_rcv(b: &mut Bencher) {
snd.send(89u64);
}
for _ in 0..10_000 {
rcv.next().unwrap();
rcv.iter().next().unwrap();
}
});
}
View
@@ -129,7 +129,7 @@ pub enum Error {
/// let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
///
/// snd.send(9);
/// assert_eq!(Some(9), rcv.next());
/// assert_eq!(Some(9), rcv.iter().next());
/// ```
pub fn channel<T>(name: &str, data_dir: &Path) -> Result<(Sender<T>, Receiver<T>), Error>
where T: Serialize + Deserialize
@@ -183,33 +183,33 @@ mod test {
snd.send(1);
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
#[test]
fn zero_item_round_trip() {
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = channel("zero_item_round_trip", dir.path()).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
snd.send(1);
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
#[test]
fn all_mem_buffer_round_trip() {
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = channel("zero_item_round_trip", dir.path()).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
let cap = 1022;
for _ in 0..cap {
snd.send(1);
}
for _ in 0..cap {
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
}
@@ -218,14 +218,14 @@ mod test {
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = channel("zero_item_round_trip", dir.path()).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
let cap = 1024;
for _ in 0..cap {
snd.send(1);
}
for _ in 0..cap {
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
}
@@ -234,14 +234,14 @@ mod test {
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = channel("zero_item_round_trip", dir.path()).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
let cap = 2048;
for _ in 0..cap {
snd.send(1);
}
for _ in 0..cap {
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
}
@@ -250,14 +250,14 @@ mod test {
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = channel("zero_item_round_trip", dir.path()).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
let cap = 4048;
for _ in 0..cap {
snd.send(1);
}
for _ in 0..cap {
assert_eq!(Some(1), rcv.next());
assert_eq!(Some(1), rcv.iter().next());
}
}
@@ -274,7 +274,7 @@ mod test {
}
for ev in evs {
assert_eq!(Some(ev), rcv.next());
assert_eq!(Some(ev), rcv.iter().next());
}
TestResult::passed()
}
@@ -300,7 +300,7 @@ mod test {
let mut total = evs.len();
for ev in evs {
println!("REMAINING: {}", total);
assert_eq!(Some(ev), rcv.next());
assert_eq!(Some(ev), rcv.iter().next());
total -= 1;
}
TestResult::passed()
@@ -338,7 +338,7 @@ mod test {
// value from the rcv, then marking it out of tst_pylds
for _ in 0..(max_sz * max_thrs) {
loop {
if let Some(nxt) = rcv.next() {
if let Some(nxt) = rcv.iter().next() {
let idx = tst_pylds.binary_search(&nxt).expect("DID NOT FIND ELEMENT");
tst_pylds.remove(idx);
break;
@@ -385,7 +385,7 @@ mod test {
joins.push(thread::spawn(move || {
for _ in 0..total_pylds {
loop {
if let Some(_) = rcv.next() {
if let Some(_) = rcv.iter().next() {
break;
}
}
View
@@ -1,9 +1,10 @@
use bincode::serde::deserialize;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::fs;
use std::io::{BufReader, ErrorKind, Read, SeekFrom, Seek};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::iter::IntoIterator;
use private;
@@ -65,15 +66,8 @@ impl<T> Receiver<T>
fs_lock: fs_lock,
})
}
}
impl<T> Iterator for Receiver<T>
where T: Serialize + Deserialize
{
type Item = T;
fn next(&mut self) -> Option<T> {
fn next_value(&mut self) -> Option<T> {
let mut sz_buf = [0; 4];
let mut syn = self.fs_lock.lock().expect("Receiver fs_lock was poisoned!");
// The receive loop
@@ -184,4 +178,54 @@ impl<T> Iterator for Receiver<T>
}
None
}
/// An iterator over messages on a receiver, this iterator will block
/// whenever `next` is called, waiting for a new message, and `None` will be
/// returned when the corresponding channel has hung up.
pub fn iter<'a>(&'a mut self) -> Iter<'a, T> {
Iter { rx: self }
}
}
#[derive(Debug)]
pub struct Iter<'a, T: 'a + Deserialize> {
rx: &'a mut Receiver<T>
}
impl<'a, T> Iterator for Iter<'a, T>
where T: Deserialize
{
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.next_value()
}
}
#[derive(Debug)]
pub struct IntoIter<T: Deserialize> {
rx: Receiver<T>
}
impl<T> Iterator for IntoIter<T>
where T: Deserialize
{
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.next_value()
}
}
impl<T> IntoIterator for Receiver<T>
where T: Deserialize
{
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> IntoIter<T> {
IntoIter { rx: self }
}
}
View
@@ -51,7 +51,7 @@ mod integration {
for _ in 0..100 {
thread::sleep(dur);
loop {
if let Some(i) = rcv.next() {
if let Some(i) = rcv.iter().next() {
count += 1;
if max_thrs == 1 {
assert_eq!(i, nxt);
View
@@ -14,7 +14,7 @@ mod integration {
let (mut snd, mut rcv) =
channel_with_max_bytes("zero_item_round_trip", dir.path(), 1_048_576).unwrap();
assert_eq!(None, rcv.next());
assert_eq!(None, rcv.iter().next());
let max = 10;
@@ -25,7 +25,7 @@ mod integration {
let mut nxt = 0;
let mut count = 0;
loop {
match rcv.next() {
match rcv.iter().next() {
Some(i) => {
count += 1;
assert_eq!(i, nxt);
@@ -61,7 +61,7 @@ mod integration {
for _ in 0..250 {
thread::sleep(dur);
loop {
if let Some(_) = rcv.next() {
if let Some(_) = rcv.iter().next() {
count += 1;
} else {
break;
@@ -117,7 +117,7 @@ mod integration {
for _ in 0..250 {
thread::sleep(dur);
loop {
if let Some(_) = rcv.next() {
if let Some(_) = rcv.iter().next() {
count += 1;
} else {
break;

0 comments on commit dc82da6

Please sign in to comment.