diff --git a/src/core/index.rs b/src/core/index.rs index 54699a3fcf..cd2f84dd4c 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -43,7 +43,7 @@ impl fmt::Debug for Index { } } -type DirectoryPtr = Box; +pub type DirectoryPtr = Box; #[derive(Clone)] pub struct Index { @@ -127,7 +127,7 @@ impl Index { pub fn publish_segment(&mut self, segment: &Segment) -> Result<()> { { let mut meta_write = self.metas.write().unwrap(); - meta_write.segments.push(segment.segment_id.clone()); + meta_write.segments.push(segment.segment_id); } self.save_metas() } @@ -148,7 +148,6 @@ impl Index { } meta_write.segments.push(merged_segment.id()); } - // TODO use logs self.save_metas() } @@ -201,10 +200,11 @@ impl Index { pub fn save_metas(&mut self,) -> Result<()> { let mut w = Vec::new(); { - let metas_lock = self.metas.read().unwrap() ; + let metas_lock = self.metas.read().unwrap(); try!(write!(&mut w, "{}\n", json::as_pretty_json(&*metas_lock))); }; - try!(self.rw_directory()) + try!(self + .rw_directory()) .atomic_write(&META_FILEPATH, &w[..]) } } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index a403419f2f..3fcbea0653 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -107,7 +107,7 @@ impl SegmentReader { let positions_data = segment .open_read(SegmentComponent::POSITIONS) - .unwrap_or(ReadOnlySource::Anonymous(Vec::new())); + .unwrap_or(ReadOnlySource::empty()); let schema = segment.schema(); Ok(SegmentReader { diff --git a/src/datastruct/fstmap.rs b/src/datastruct/fstmap.rs index 8c77419ab5..6aead53823 100644 --- a/src/datastruct/fstmap.rs +++ b/src/datastruct/fstmap.rs @@ -61,7 +61,7 @@ pub struct FstMap { fn open_fst_index(source: ReadOnlySource) -> io::Result { Ok(fst::Map::from(match source { - ReadOnlySource::Anonymous(data) => try!(Fst::from_bytes(data).map_err(convert_fst_error)), + ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)), ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)), })) } @@ -141,6 +141,7 @@ mod tests { assert_eq!(keys.next().unwrap(), "abc".as_bytes()); assert_eq!(keys.next().unwrap(), "abcd".as_bytes()); assert_eq!(keys.next(), None); + } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index f9bc46f7ee..d554334662 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -2,16 +2,18 @@ mod mmap_directory; mod ram_directory; mod directory; mod read_only_source; +mod shared_vec_slice; use std::io::{Seek, Write}; use std::io; + use std::path::PathBuf; +pub use self::shared_vec_slice::SharedVecSlice; pub use self::read_only_source::ReadOnlySource; pub use self::directory::Directory; pub use self::ram_directory::RAMDirectory; pub use self::mmap_directory::MmapDirectory; -pub use self::ram_directory::SharedVec; //////////////////////////////////////// @@ -35,6 +37,7 @@ mod tests { use super::*; use std::path::Path; + use std::io::SeekFrom; #[test] fn test_ram_directory() { @@ -48,12 +51,13 @@ mod tests { test_directory(&mut mmap_directory); } - fn test_directory(directory: &mut Directory) { + fn test_directory_simple(directory: &mut Directory) { { let mut write_file = directory.open_write(Path::new("toto")).unwrap(); write_file.write_all(&[4]).unwrap(); write_file.write_all(&[3]).unwrap(); write_file.write_all(&[7,3,5]).unwrap(); + write_file.flush().unwrap(); } let read_file = directory.open_read(Path::new("toto")).unwrap(); let data: &[u8] = &*read_file; @@ -65,4 +69,30 @@ mod tests { assert_eq!(data[4], 5); } + + fn test_directory_seek(directory: &mut Directory) { + { + let mut write_file = directory.open_write(Path::new("toto_seek")).unwrap(); + write_file.write_all(&[4]).unwrap(); + write_file.write_all(&[3]).unwrap(); + write_file.write_all(&[7,3,5]).unwrap(); + write_file.seek(SeekFrom::Start(0)).unwrap(); + write_file.write_all(&[3,1]).unwrap(); + write_file.flush().unwrap(); + } + let read_file = directory.open_read(Path::new("toto_seek")).unwrap(); + let data: &[u8] = &*read_file; + assert_eq!(data.len(), 5); + assert_eq!(data[0], 3); + assert_eq!(data[1], 1); + assert_eq!(data[2], 7); + assert_eq!(data[3], 3); + assert_eq!(data[4], 5); + } + + fn test_directory(directory: &mut Directory) { + test_directory_simple(directory); + test_directory_seek(directory); + } + } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 6a3f00f40f..faae203dfd 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -8,40 +8,81 @@ use std::path::{Path, PathBuf}; use directory::OpenError; use directory::WritePtr; use std::result; +use super::SharedVecSlice; use Result; -#[derive(Clone)] -pub struct SharedVec(Arc>>>); - -pub struct RAMDirectory { - fs: HashMap, +struct VecWriter { + path: PathBuf, + shared_directory: InnerDirectory, + data: Cursor>, + is_flushed: bool, } -impl SharedVec { - pub fn new() -> SharedVec { - SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) )) + +impl Drop for VecWriter { + fn drop(&mut self) { + if !self.is_flushed { + panic!("You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", self.path) + } } - - pub fn copy_vec(&self,) -> Vec { - self.0.read().unwrap().clone().into_inner() +} + +impl Seek for VecWriter { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.data.seek(pos) } } -impl Write for SharedVec { +impl Write for VecWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - try!(self.0.write().unwrap().write(buf)); + self.is_flushed = false; + try!(self.data.write(buf)); Ok(buf.len()) } + fn flush(&mut self) -> io::Result<()> { - Ok(()) + self.is_flushed = true; + self.shared_directory.write(self.path.clone(), self.data.get_ref()) } } -impl Seek for SharedVec { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.0.write().unwrap().seek(pos) +#[derive(Clone)] +struct InnerDirectory(Arc>>>>); + +impl InnerDirectory { + + fn new() -> InnerDirectory { + InnerDirectory(Arc::new(RwLock::new(HashMap::new()))) + } + + fn write(&self, path: PathBuf, data: &Vec) -> io::Result<()> { + let mut map = try!( + self.0 + .write() + .map_err(|_| io::Error::new(io::ErrorKind::Other, format!("Failed to lock the directory, when trying to write {:?}", path))) + ); + map.insert(path, Arc::new(data.clone())); + Ok(()) } + + fn open_read(&self, path: &Path) -> result::Result { + self.0 + .read() + .map_err(|_| { + let io_err = io::Error::new(io::ErrorKind::Other, format!("Failed to read lock for the directory, when trying to read {:?}", path)); + OpenError::IOError(io_err) + }) + .and_then(|readable_map| { + readable_map + .get(path) + .ok_or_else(|| OpenError::FileDoesNotExist(PathBuf::from(path))) + .map(|data| { + ReadOnlySource::Anonymous(SharedVecSlice::new(data.clone())) + }) + }) + } + } impl fmt::Debug for RAMDirectory { @@ -53,33 +94,37 @@ impl fmt::Debug for RAMDirectory { impl RAMDirectory { pub fn create() -> RAMDirectory { RAMDirectory { - fs: HashMap::new() + fs: InnerDirectory::new() } } } + +pub struct RAMDirectory { + fs: InnerDirectory, +} + impl Directory for RAMDirectory { fn open_read(&self, path: &Path) -> result::Result { - match self.fs.get(path) { - Some(ref data) => { - let data_copy = data.copy_vec(); - Ok(ReadOnlySource::Anonymous(data_copy)) - }, - None => { - Err(OpenError::FileDoesNotExist(PathBuf::from(path))) - } - } + self.fs.open_read(path) } + fn open_write(&mut self, path: &Path) -> Result { - let full_path = PathBuf::from(&path); - let data = SharedVec::new(); - self.fs.insert(full_path, data.clone()); - Ok(Box::new(data)) + let mut vec_writer = VecWriter { + path: PathBuf::from(path), + data: Cursor::new(Vec::new()), + shared_directory: self.fs.clone(), + is_flushed: false, + }; + // force the creation of the file to mimick the MMap directory. + try!(vec_writer.flush()); + Ok(Box::new(vec_writer)) } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> Result<()> { let mut write = try!(self.open_write(path)); try!(write.write_all(data)); + try!(write.flush()); Ok(()) } diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 9af50c5a03..0733a35130 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -1,6 +1,7 @@ use fst::raw::MmapReadOnly; use std::ops::Deref; use std::io::Cursor; +use super::SharedVecSlice; //////////////////////////////////////// // Read only source. @@ -8,7 +9,7 @@ use std::io::Cursor; pub enum ReadOnlySource { Mmap(MmapReadOnly), - Anonymous(Vec), + Anonymous(SharedVecSlice), } impl Deref for ReadOnlySource { @@ -25,12 +26,18 @@ impl ReadOnlySource { self.as_slice().len() } + pub fn empty() -> ReadOnlySource { + ReadOnlySource::Anonymous(SharedVecSlice::empty()) + } + pub fn as_slice(&self,) -> &[u8] { match *self { ReadOnlySource::Mmap(ref mmap_read_only) => unsafe { mmap_read_only.as_slice() }, - ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(), + ReadOnlySource::Anonymous(ref shared_vec) => { + shared_vec.as_slice() + }, } } @@ -45,8 +52,7 @@ impl ReadOnlySource { ReadOnlySource::Mmap(sliced_mmap) } ReadOnlySource::Anonymous(ref shared_vec) => { - let sliced_data: Vec = Vec::from(&shared_vec[from_offset..to_offset]); - ReadOnlySource::Anonymous(sliced_data) + ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset)) }, } } diff --git a/src/directory/shared_vec_slice.rs b/src/directory/shared_vec_slice.rs new file mode 100644 index 0000000000..e51c993486 --- /dev/null +++ b/src/directory/shared_vec_slice.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +#[derive(Clone)] +pub struct SharedVecSlice { + pub data: Arc>, + pub start: usize, + pub len: usize +} + +impl SharedVecSlice { + + pub fn empty() -> SharedVecSlice { + SharedVecSlice::new(Arc::new(Vec::new())) + } + + pub fn new(data: Arc>) -> SharedVecSlice { + let data_len = data.len(); + SharedVecSlice { + data: data, + start: 0, + len: data_len, + } + } + + pub fn as_slice(&self,) -> &[u8] { + &self.data[self.start..self.start + self.len] + } + + pub fn slice(&self, from_offset: usize, to_offset:usize) -> SharedVecSlice { + SharedVecSlice { + data: self.data.clone(), + start: self.start + from_offset, + len: to_offset - from_offset, + } + } +} diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index ca46a84d50..e23614af5a 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -88,6 +88,7 @@ impl FastFieldSerializer { self.written_size += try!(self.fields.serialize(&mut self.write)); try!(self.write.seek(SeekFrom::Start(0))); try!((header_offset as u32).serialize(&mut self.write)); + try!(self.write.flush()); Ok(self.written_size) } } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index c4d55bd0fd..4272c363aa 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -52,11 +52,12 @@ mod tests { let mut posting_serializer = PostingsSerializer::open(&segment).unwrap(); let term = Term::from_field_text(text_field, "abc"); posting_serializer.new_term(&term, 3).unwrap(); - for _ in 0..3 { - let a = vec!(1,2,3,2); - posting_serializer.write_doc(0, 2, &a).unwrap(); + for doc_id in 0u32..3u32 { + let positions = vec!(1,2,3,2); + posting_serializer.write_doc(doc_id, 2, &positions).unwrap(); } posting_serializer.close_term().unwrap(); + posting_serializer.close().unwrap(); let read = segment.open_read(SegmentComponent::POSITIONS).unwrap(); assert_eq!(read.len(), 13); } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 3da98e4d6f..83cf8ac8b2 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -31,6 +31,7 @@ pub struct PostingsSerializer { position_deltas: Vec, schema: Schema, text_indexing_options: TextIndexingOptions, + term_open: bool, } impl PostingsSerializer { @@ -55,6 +56,7 @@ impl PostingsSerializer { position_deltas: Vec::new(), schema: schema, text_indexing_options: TextIndexingOptions::Unindexed, + term_open: false, }) } @@ -76,7 +78,10 @@ impl PostingsSerializer { } pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { - try!(self.close_term()); + if self.term_open { + panic!("Called new_term, while the previous term was not closed."); + } + self.term_open = true; self.load_indexing_options(term.get_field()); self.doc_ids.clear(); self.last_doc_id_encoded = 0; @@ -92,14 +97,22 @@ impl PostingsSerializer { } pub fn close_term(&mut self,) -> io::Result<()> { - if !self.doc_ids.is_empty() { - { - let block_encoded = self.block_encoder.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); - self.written_bytes_postings += block_encoded.len(); - try!(self.postings_write.write_all(block_encoded)); - } - if self.text_indexing_options.is_termfreq_enabled() { + if self.term_open { + if !self.doc_ids.is_empty() { + // we have doc ids waiting to be written + // this happens when the number of doc ids is + // not a perfect multiple of our block size. + // + // In that case, the remaining part is encoded + // using variable int encoding. { + let block_encoded = self.block_encoder.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); + self.written_bytes_postings += block_encoded.len(); + try!(self.postings_write.write_all(block_encoded)); + self.doc_ids.clear(); + } + // ... Idem for term frequencies + if self.text_indexing_options.is_termfreq_enabled() { let block_encoded = self.block_encoder.compress_vint_unsorted(&self.term_freqs[..]); for num in block_encoded { self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); @@ -107,14 +120,16 @@ impl PostingsSerializer { self.term_freqs.clear(); } } - self.doc_ids.clear(); - } - if self.text_indexing_options.is_position_enabled() { - self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64).serialize(&mut self.positions_write)); - let positions_encoded: &[u8] = self.positions_encoder.compress_unsorted(&self.position_deltas[..]); - try!(self.positions_write.write_all(positions_encoded)); - self.written_bytes_positions += positions_encoded.len(); - self.position_deltas.clear(); + // On the other hand, positions are entirely buffered until the + // end of the term, at which point they are compressed and written. + if self.text_indexing_options.is_position_enabled() { + self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64).serialize(&mut self.positions_write)); + let positions_encoded: &[u8] = self.positions_encoder.compress_unsorted(&self.position_deltas[..]); + try!(self.positions_write.write_all(positions_encoded)); + self.written_bytes_positions += positions_encoded.len(); + self.position_deltas.clear(); + } + self.term_open = false; } Ok(()) } diff --git a/src/postings/writer.rs b/src/postings/writer.rs index 3293009e4b..1726b9c10e 100644 --- a/src/postings/writer.rs +++ b/src/postings/writer.rs @@ -119,6 +119,7 @@ impl PostingsWriter for SpecializedPostingsWriter let term_docfreq = term_postings_writer.doc_freq(); try!(serializer.new_term(term, term_docfreq)); try!(term_postings_writer.serialize(serializer)); + try!(serializer.close_term()); } Ok(()) } diff --git a/src/query/daat_multiterm_scorer.rs b/src/query/daat_multiterm_scorer.rs index 65a6ba78aa..798942e4a3 100644 --- a/src/query/daat_multiterm_scorer.rs +++ b/src/query/daat_multiterm_scorer.rs @@ -204,9 +204,10 @@ mod tests { use postings::{DocSet, VecPostings}; use query::TfIdf; use query::Scorer; - use directory::ReadOnlySource; - use directory::SharedVec; + use directory::Directory; + use directory::RAMDirectory; use schema::Field; + use std::path::Path; use query::Occur; use fastfield::{U32FastFieldReader, U32FastFieldWriter, FastFieldSerializer}; @@ -216,12 +217,16 @@ mod tests { for val in vals { u32_field_writer.add_val(val); } - let data = SharedVec::new(); - let write: Box = Box::new(data.clone()); - let mut serializer = FastFieldSerializer::new(write).unwrap(); - u32_field_writer.serialize(&mut serializer).unwrap(); - serializer.close().unwrap(); - U32FastFieldReader::open(ReadOnlySource::Anonymous(data.copy_vec())).unwrap() + let path = Path::new("some_path"); + let mut directory = RAMDirectory::create(); + { + let write = directory.open_write(&path).unwrap(); + let mut serializer = FastFieldSerializer::new(write).unwrap(); + u32_field_writer.serialize(&mut serializer).unwrap(); + serializer.close().unwrap(); + } + let read = directory.open_read(&path).unwrap(); + U32FastFieldReader::open(read).unwrap() } fn abs_diff(left: f32, right: f32) -> f32 {