Permalink
Browse files

More fiddling along these lines

WIP commit. I've removed the old Fsync and am now relying on an
mmap'ed structure. The existing implementation will not work but
I think it can be made to.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information...
Brian L. Troutwine
Brian L. Troutwine committed Dec 3, 2017
1 parent e80e851 commit 392238f4961874eb8b4b68b787e51b81d7ba7387
Showing with 402 additions and 386 deletions.
  1. +2 −1 Cargo.toml
  2. +71 −0 src/common.rs
  3. +41 −14 src/lib.rs
  4. +0 −51 src/private.rs
  5. +173 −157 src/receiver.rs
  6. +115 −163 src/sender.rs
View
@@ -6,7 +6,7 @@ license = "MIT"
name = "hopper"
readme = "README.md"
repository = "https://github.com/postmates/hopper"
version = "0.3.4"
version = "0.4.0-pre"
[dev-dependencies]
quickcheck = "0.4"
@@ -15,3 +15,4 @@ tempdir = "0.3"
[dependencies]
bincode = "0.9"
serde = "1.0"
memmap = "0.6"
View
@@ -0,0 +1,71 @@
use std::{fs, mem, path};
use memmap;
#[inline]
pub fn u8tou32abe(v: &[u8]) -> u32 {
u32::from(v[3]) + (u32::from(v[2]) << 8) + (u32::from(v[1]) << 24) + (u32::from(v[0]) << 16)
}
#[inline]
pub fn u32tou8abe(v: u32) -> [u8; 4] {
[v as u8, (v >> 8) as u8, (v >> 24) as u8, (v >> 16) as u8]
}
#[derive(Debug, Clone)]
pub struct Config {
pub maximum_queue_in_bytes: u32,
pub root_dir: path::PathBuf,
}
#[derive(Debug)]
pub struct HIndex {
root: path::PathBuf,
path: path::PathBuf,
block: memmap::MmapMut,
}
impl HIndex {
pub fn new(data_dir: &path::Path) -> Result<HIndex, super::Error>
{
if !data_dir.is_dir() {
return Err(super::Error::NoSuchDirectory);
}
let idx = data_dir.join("index");
let file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&idx).unwrap(); // TODO no unwrap
file.set_len((mem::size_of::<u32>() * 2) as u64).unwrap(); // TODO no unwrap
let mmap = unsafe { memmap::MmapMut::map_mut(&file).unwrap() /* TODO no unwrap */ };
Ok(HIndex {
root: data_dir.to_path_buf(),
path: idx,
block: mmap
})
}
pub fn sender_idx(&self) -> u32 {
u8tou32abe(&self.block[0..3])
}
// TODO not safe because of multiple senders writing to one location
pub fn set_sender_idx(&mut self, val: u32) -> () {
let abe = u32tou8abe(val);
for i in 0..4 {
self.block[i] = abe[i];
}
}
pub fn receiver_idx(&self) -> u32 {
u8tou32abe(&self.block[4..7])
}
pub fn set_receiver_idx(&mut self, val: u32) -> () {
let abe = u32tou8abe(val);
for i in 5..8 {
self.block[i] = abe[i];
}
}
}
View
@@ -1,5 +1,5 @@
#![deny(missing_docs, missing_debug_implementations, missing_copy_implementations,
trivial_numeric_casts, unsafe_code, unstable_features, unused_import_braces,
trivial_numeric_casts, unstable_features, unused_import_braces,
unused_qualifications)]
//! hopper - an unbounded mpsc with bounded memory
//!
@@ -81,10 +81,11 @@
//! integrity. We are open to improvements in this area.
extern crate serde;
extern crate bincode;
extern crate memmap;
mod common;
mod receiver;
mod sender;
mod private;
pub use self::receiver::Receiver;
pub use self::sender::Sender;
@@ -93,9 +94,36 @@ use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fs;
use std::path::Path;
use std::sync;
use std::mem;
// Here's how I think it'll work:
//
// There's an 'index' file. This is two u32s back to back. The left u32 is the
// 'sender' index, the right is the 'receiver' index. It looks like this
// on-disk:
//
// sender receiver
// |-------------------------------|-------------------------------|
// u32 u32
//
// There's concurrent access to the sender side of things, singleton access to
// receiver. The index file means we don't have to poll the disk to find our
// location.
//
// The queue files look like this:
//
// size (31 bits) blob
// |------------------------------\-|------------ ... --------------|
// u16 ^ read
//
// The 'size' gives the total size of the blob and must fit into 31 bits. This
// limits a blob to 268 megabytes. That's... probably enough. 'size' is set by a
// sender and read by the receiver. The 'read' signals to the receiver whether
// it has read this record yet or not. It is set to 0 by a sender on write, set
// to 1 by the receiver on read. 'read' allows us to seek through the structure
// for our last read position.
/// Defines the errors that hopper will bubble up
///
/// Hopper's error story is pretty bare right now. Hopper should be given sole
@@ -146,25 +174,24 @@ where
pub fn channel_with_max_bytes<T>(
name: &str,
data_dir: &Path,
max_bytes: usize,
max_bytes: u32,
) -> Result<(Sender<T>, Receiver<T>), Error>
where
T: Serialize + DeserializeOwned,
{
use std::sync::Arc;
let root = data_dir.join(name);
let snd_root = root.clone();
let rcv_root = root.clone();
let sz = mem::size_of::<T>() as u32;
let max_bytes = if max_bytes < sz { sz } else { max_bytes };
let config = common::Config {
maximum_queue_in_bytes: max_bytes,
root_dir: root.clone(),
};
if !root.is_dir() {
fs::create_dir_all(root).expect("could not create directory");
}
let cap: usize = 1024;
let sz = mem::size_of::<T>();
let max_bytes = if max_bytes < sz { sz } else { max_bytes };
let fs_lock = sync::Arc::new(sync::Mutex::new(private::FsSync::new(cap)));
let sender = try!(Sender::new(name, &snd_root, max_bytes, Arc::clone(&fs_lock)));
let receiver = try!(Receiver::new(&rcv_root, fs_lock));
let sender = Sender::new(config.clone())?;
let receiver = Receiver::new(config)?;
Ok((sender, receiver))
}
View

This file was deleted.

Oops, something went wrong.
Oops, something went wrong.

0 comments on commit 392238f

Please sign in to comment.