Skip to content

Commit

Permalink
Fixes #72 - Cache directory uses weak ref. Introduced CacheInfo object.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 20, 2017
1 parent 3129701 commit 062e38a
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 64 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ keywords = ["search", "information", "retrieval"]

[dependencies]
byteorder = "1.0"
memmap = "0.5"
memmap = "0.4"
lazy_static = "0.2.1"
regex = "0.2"
fst = "0.1.35"
fst = "0.1.37"
atomicwrites = "0.1.3"
tempfile = "2.1"
rustc-serialize = "0.3"
Expand Down
298 changes: 238 additions & 60 deletions src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,127 @@
use std::path::{Path, PathBuf};
use tempdir::TempDir;
use std::collections::HashMap;
use std::collections::hash_map::Entry as HashMapEntry;
use fst::raw::MmapReadOnly;
use std::fs::File;
use atomicwrites;
use std::sync::RwLock;
use std::fmt;
use std::io::Write;
use std::io;
use std::io::{Seek, SeekFrom};
use common::make_io_err;
use directory::Directory;
use directory::error::{OpenWriteError, FileError, OpenDirectoryError};
use directory::ReadOnlySource;
use directory::shared_vec_slice::SharedVecSlice;
use directory::WritePtr;
use std::io::BufWriter;
use fst::raw::MmapReadOnly;
use memmap::{Mmap, Protection};
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::HashMap;
use std::mem;
use std::convert::From;
use std::fmt;
use std::fs;
use std::fs::File;
use std::fs::OpenOptions;
use directory::error::{OpenWriteError, FileError, OpenDirectoryError};
use std::io;
use std::io::{Seek, SeekFrom};
use std::io::BufWriter;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::result;
use common::make_io_err;
use std::sync::Arc;
use std::fs;
use directory::shared_vec_slice::SharedVecSlice;
use std::sync::RwLock;
use std::sync::Weak;
use tempdir::TempDir;


fn open_mmap(full_path: &PathBuf) -> result::Result<Option<Arc<Mmap>>, FileError> {
let convert_file_error = |err: io::Error| {
if err.kind() == io::ErrorKind::NotFound {
FileError::FileDoesNotExist(full_path.clone())
}
else {
FileError::IOError(err)
}
};
let file = File::open(&full_path).map_err(convert_file_error)?;
if try!(file.metadata()).len() == 0 {
// if the file size is 0, it will not be possible
// to mmap the file, so we return an anonymous mmap_cache
// instead.
return Ok(None)
}
Ok(Some(Arc::new(Mmap::open(&file, Protection::Read)?)))
}

#[derive(Default,Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct CacheCounters {
hit: usize,
miss_empty: usize,
miss_weak: usize,
}

#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct CacheInfo {
pub counters: CacheCounters,
pub mmapped: Vec<PathBuf>,
}

#[derive(Default)]
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, Weak<Mmap>>,
}

impl MmapCache {

fn cleanup(&mut self) {
let mut new_cache = HashMap::new();
mem::swap(&mut new_cache, &mut self.cache);
self.cache = new_cache
.into_iter()
.filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some())
.collect();
}

fn get_info(&mut self) -> CacheInfo {
self.cleanup();
let paths: Vec<PathBuf> = self.cache.keys()
.cloned()
.collect();
CacheInfo {
counters: self.counters.clone(),
mmapped: paths,
}
}

fn get_mmap(&mut self, full_path: PathBuf) -> Result<Option<Arc<Mmap>>, FileError> {
if self.cache.len() > 100 {
self.cleanup();
}
Ok(match self.cache.entry(full_path.clone()) {
HashMapEntry::Occupied(mut occupied_entry) => {
if let Some(mmap_arc) = occupied_entry.get().upgrade() {
self.counters.hit += 1;
Some(mmap_arc.clone())
}
else {
// The entry exists but the weak ref has been destroyed.
self.counters.miss_weak += 1;
if let Some(mmap_arc) = open_mmap(&full_path)? {
occupied_entry.insert(Arc::downgrade(&mmap_arc));
Some(mmap_arc)
}
else {
None
}
}
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss_empty += 1;
if let Some(mmap_arc) = open_mmap(&full_path)? {
vacant_entry.insert(Arc::downgrade(&mmap_arc));
Some(mmap_arc)
}
else {
None
}
}
})
}
}

/// Directory storing data in files, read via mmap.
///
Expand All @@ -30,8 +130,9 @@ use directory::shared_vec_slice::SharedVecSlice;
#[derive(Clone)]
pub struct MmapDirectory {
root_path: PathBuf,
mmap_cache: Arc<RwLock<HashMap<PathBuf, MmapReadOnly>>>,
mmap_cache: Arc<RwLock<MmapCache>>,
_temp_directory: Arc<Option<TempDir>>,

}

impl fmt::Debug for MmapDirectory {
Expand All @@ -40,8 +141,6 @@ impl fmt::Debug for MmapDirectory {
}
}



impl MmapDirectory {

/// Creates a new MmapDirectory in a temporary directory.
Expand All @@ -53,13 +152,12 @@ impl MmapDirectory {
let tempdir_path = PathBuf::from(tempdir.path());
let directory = MmapDirectory {
root_path: PathBuf::from(tempdir_path),
mmap_cache: Arc::new(RwLock::new(HashMap::new())),
mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
_temp_directory: Arc::new(Some(tempdir))
};
Ok(directory)
}


/// Opens a MmapDirectory in a directory.
///
/// Returns an error if the `directory_path` does not
Expand All @@ -74,7 +172,7 @@ impl MmapDirectory {
else {
Ok(MmapDirectory {
root_path: PathBuf::from(directory_path),
mmap_cache: Arc::new(RwLock::new(HashMap::new())),
mmap_cache: Arc::new(RwLock::new(MmapCache::default())),
_temp_directory: Arc::new(None)
})
}
Expand All @@ -95,6 +193,14 @@ impl MmapDirectory {
Ok(())
}

pub fn get_cache_info(&mut self) -> CacheInfo {
self.mmap_cache
.write()
.expect("Mmap cache lock is poisoned.")
.get_info()
}


}

/// This Write wraps a File, but has the specificity of
Expand Down Expand Up @@ -128,47 +234,21 @@ impl Seek for SafeFileWriter {

impl Directory for MmapDirectory {



fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, FileError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);

let mut mmap_cache = try!(
self.mmap_cache
.write()
.map_err(|_| {
make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path))
})
);

let mmap = match mmap_cache.entry(full_path.clone()) {
HashMapEntry::Occupied(e) => {
e.get().clone()
}
HashMapEntry::Vacant(vacant_entry) => {
let file = try!(
File::open(&full_path).map_err(|err| {
if err.kind() == io::ErrorKind::NotFound {
FileError::FileDoesNotExist(full_path.clone())
}
else {
FileError::IOError(err)
}
})
);
if try!(file.metadata()).len() == 0 {
// if the file size is 0, it will not be possible
// to mmap the file, so we return an anonymous mmap_cache
// instead.
return Ok(ReadOnlySource::Anonymous(SharedVecSlice::empty()))
}
let new_mmap = try!(MmapReadOnly::open(&file));
vacant_entry.insert(new_mmap.clone());
new_mmap
}
};
Ok(ReadOnlySource::Mmap(mmap))
let mut mmap_cache = self.mmap_cache
.write()
.map_err(|_| {
make_io_err(format!("Failed to acquired write lock on mmap cache while reading {:?}", path))
})?;

Ok(mmap_cache.get_mmap(full_path)?
.map(MmapReadOnly::from)
.map(ReadOnlySource::Mmap)
.unwrap_or(ReadOnlySource::Anonymous(SharedVecSlice::empty()))
)
}

fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
Expand Down Expand Up @@ -214,7 +294,7 @@ impl Directory for MmapDirectory {
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
mmap_cache.remove(&full_path);
mmap_cache.cache.remove(&full_path);
try!(fs::remove_file(&full_path));
try!(self.sync_directory());
Ok(())
Expand All @@ -240,3 +320,101 @@ impl Directory for MmapDirectory {
}

}




#[cfg(test)]
mod tests {

// There are more tests in directory/mod.rs
// The following tests are specific to the MmapDirectory

use super::*;

#[test]
fn test_open_empty() {
// empty file is actually an edge case because those
// cannot be mmapped.
//
// In that case the directory returns a SharedVecSlice.
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
let path = PathBuf::from("test");
{
let mut w = mmap_directory.open_write(&path).unwrap();
w.flush().unwrap();
}
let readonlymap = mmap_directory.open_read(&path).unwrap();
assert_eq!(readonlymap.len(), 0);
}

#[test]
fn test_cache() {


let content = "abc".as_bytes();

// here we test if the cache releases
// mmaps correctly.
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
let paths: Vec<PathBuf> = (0..10)
.map(|i| PathBuf::from(&*format!("file_{}", i)))
.collect();
{
for path in &paths {
let mut w = mmap_directory.open_write(path).unwrap();
w.write(content).unwrap();
w.flush().unwrap();
}
}
{
for path in &paths {
{
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
}
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
}
assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10);


{
// test weak miss
// the first pass create the weak refs.
for path in &paths {
let _r = mmap_directory.open_read(path).unwrap();
}
// ... the second hits the weak refs.
for path in &paths {
let _r = mmap_directory.open_read(path).unwrap();
}
let cache_info = mmap_directory.get_cache_info();
assert_eq!(cache_info.counters.miss_empty, 20);
assert_eq!(cache_info.counters.miss_weak, 10);
}

{
let mut saved_readmmaps = vec!();
// Keeps reference alive
for (i, path) in paths.iter().enumerate() {
let r = mmap_directory.open_read(path).unwrap();
saved_readmmaps.push(r);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
let cache_info = mmap_directory.get_cache_info();
println!("{:?}", cache_info);
assert_eq!(cache_info.counters.miss_empty, 30);
assert_eq!(cache_info.counters.miss_weak, 10);
assert_eq!(cache_info.mmapped.len(), 10);

for saved_readmmap in saved_readmmaps {
assert_eq!(saved_readmmap.as_slice(), content);
}
}

assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);

}

}
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ extern crate log;

#[macro_use]
extern crate version;

#[macro_use]
extern crate fst;
extern crate byteorder;
extern crate memmap;
Expand Down

0 comments on commit 062e38a

Please sign in to comment.