Skip to content

Commit

Permalink
Facet collector
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 18, 2017
1 parent fb42e9f commit 34b7d2b
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 46 deletions.
110 changes: 96 additions & 14 deletions src/collector/facet_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,59 @@ use schema::Field;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use schema::Facet;
use std::borrow::BorrowMut;
use std::fmt::{self, Debug};

use DocId;
use Result;
use Score;
use SegmentReader;
use SegmentLocalId;

pub struct FacetCollector {
// local_counters: HashMap::new(),
#[derive(Clone)]
pub struct FacetCollectorBuilder {
field: Field,
ff_reader: Option<UnsafeCell<FacetReader>>,
local_counters: Vec<u64>,
global_counters: HashMap<Facet, u64>,
root_facet: Option<Facet>,
depth: Option<usize>,
}

impl FacetCollectorBuilder {
pub fn for_field(field: Field) -> FacetCollectorBuilder {
FacetCollectorBuilder {
field: field,
root_facet: None,
depth: None,
}
}

impl FacetCollector {
/// Creates a new facet collector for aggregating a given field.
pub fn new(field: Field) -> FacetCollector {
pub fn set_root_facet(mut self, facet: Facet) -> FacetCollectorBuilder {
self.root_facet = Some(facet);
self
}

pub fn set_depth(mut self, depth: usize) -> FacetCollectorBuilder {
self.depth = Some(depth);
self
}

pub fn build(self) -> FacetCollector {
FacetCollector {
field: field,
field: self.field,
ff_reader: None,
local_counters: vec![],
global_counters: HashMap::new(),
}
}
}

pub struct FacetCollector {
// local_counters: HashMap::new(),
field: Field,
ff_reader: Option<UnsafeCell<FacetReader>>,
local_counters: Vec<u64>,
global_counters: HashMap<Facet, u64>,
}

impl FacetCollector {
fn translate_ordinals(&mut self) {
for (term_ord, count) in self.local_counters.iter_mut().enumerate() {
if *count > 0 {
Expand All @@ -44,13 +69,16 @@ impl FacetCollector {
}
}
}
}

fn counts(mut self) -> HashMap<Facet, u64> {
self.translate_ordinals();
self.global_counters

impl Collector for FacetCollector
{
}
}


impl Collector for FacetCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.translate_ordinals();
self.local_counters.clear();
Expand All @@ -62,7 +90,7 @@ impl Collector for FacetCollector
}

fn collect(&mut self, doc: DocId, _: Score) {
let mut facet_reader: &mut FacetReader =
let facet_reader: &mut FacetReader =
unsafe {
&mut *self.ff_reader
.as_ref()
Expand All @@ -79,3 +107,57 @@ impl Collector for FacetCollector
}


#[cfg(test)]
mod tests {

use schema::SchemaBuilder;
use core::Index;
use schema::Document;
use schema::Facet;
use query::AllQuery;
use super::FacetCollectorBuilder;

#[test]
fn test_facet_collector() {
let mut schema_builder = SchemaBuilder::new();
let facet_field = schema_builder.add_facet_field("facet");
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);


let mut index_writer = index.writer(3_000_000).unwrap();
let num_facets: usize = 3 * 4 * 5;
let facets: Vec<Facet> = (0..num_facets)
.map(|mut n| {
let top = n % 3;
n /= 3;
let mid = n % 4;
n /= 4;
let leaf = n % 5;
Facet::from(&format!("/top{}/mid{}/leaf{}", top, mid, leaf))
})
.collect();
for i in 0..num_facets * 10 {
let mut doc = Document::new();
doc.add_facet(facet_field, facets[i % num_facets].clone());
index_writer.add_document(doc);
}
index_writer.commit().unwrap();


index.load_searchers().unwrap() ;
let searcher = index.searcher();

let mut facet_collector = FacetCollectorBuilder
::for_field(facet_field)
.set_depth(1)
.build();

searcher.search(&AllQuery, &mut facet_collector).unwrap();
let counts = facet_collector.counts();
for facet in facets {
assert_eq!(*counts.get(&facet).unwrap(), 10u64);
}
}
}

2 changes: 1 addition & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn open_index_writer(
heap_size_in_bytes_per_thread: usize,
) -> Result<IndexWriter> {

if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
if heap_size_in_bytes_per_thread < HEAP_SIZE_LIMIT as usize {
panic!(format!(
"The heap size per thread needs to be at least {}.",
HEAP_SIZE_LIMIT
Expand Down
68 changes: 68 additions & 0 deletions src/query/all_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use query::Query;
use query::Weight;
use query::Scorer;
use core::SegmentReader;
use Result;
use DocSet;
use Score;
use DocId;
use std::any::Any;
use core::Searcher;

#[derive(Debug)]
pub struct AllQuery;

impl Query for AllQuery {
fn as_any(&self) -> &Any {
self
}

fn weight(&self, searcher: &Searcher) -> Result<Box<Weight>> {
Ok(box AllWeight)
}
}


pub struct AllWeight;

impl Weight for AllWeight {
fn scorer<'a>(&'a self, reader: &'a SegmentReader) -> Result<Box<Scorer + 'a>> {
Ok(box AllScorer {
started: false,
doc: 0u32,
max_doc: reader.max_doc()
})
}
}

pub struct AllScorer {
started: bool,
doc: DocId,
max_doc: DocId,
}

impl DocSet for AllScorer {
fn advance(&mut self) -> bool {
if self.started {
self.doc += 1u32;
}
else {
self.started = true;
}
self.doc < self.max_doc
}

fn doc(&self) -> DocId {
self.doc
}

fn size_hint(&self) -> usize {
self.max_doc as usize
}
}

impl Scorer for AllScorer {
fn score(&self) -> Score {
1f32
}
}
2 changes: 2 additions & 0 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod occur_filter;
mod term_query;
mod query_parser;
mod phrase_query;
mod all_query;

pub use self::boolean_query::BooleanQuery;
pub use self::occur_filter::OccurFilter;
Expand All @@ -25,3 +26,4 @@ pub use self::scorer::EmptyScorer;
pub use self::scorer::Scorer;
pub use self::term_query::TermQuery;
pub use self::weight::Weight;
pub use self::all_query::{AllQuery, AllWeight, AllScorer};
1 change: 0 additions & 1 deletion src/query/term_query/term_scorer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ where
self.postings.doc()
}


fn size_hint(&self) -> usize {
self.postings.size_hint()
}
Expand Down
5 changes: 3 additions & 2 deletions src/schema/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ impl Document {
}

// TODO argument Into<Facet>
pub fn add_facet(&mut self, field: Field, path: &str) {
let facet = Facet::from_str(path);
pub fn add_facet<F>(&mut self, field: Field, path: F)
where Facet: From<F> {
let facet = Facet::from(path);
let value = Value::HierarchicalFacet(facet);
self.add(FieldValue::new(field, value));
}
Expand Down
56 changes: 30 additions & 26 deletions src/schema/facet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,45 @@ impl Facet {
pub fn from_path<Path>(path: Path) -> Facet
where
Path: IntoIterator,
Path::Item: Display {
Path::Item: ToString {
let mut facet_bytes: Vec<u8> = Vec::with_capacity(100);
let mut step_it = path.into_iter();
if let Some(step) = step_it.next() {
write!(&mut facet_bytes, "{}", step);
facet_bytes.extend_from_slice(step.to_string().as_bytes())
}
for step in step_it {
facet_bytes.push(SEP_BYTE);
write!(&mut facet_bytes, "{}", step);
facet_bytes.extend_from_slice(step.to_string().as_bytes());
}
Facet(facet_bytes)
}

pub fn from_str(path: &str) -> Facet {
// TODO check that path has the right format
pub(crate) fn inner_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}

pub fn to_string(&self) -> String {
format!("{}", self)
}

pub fn prefixes(&self) -> Vec<&[u8]> {
let mut prefixes: Vec<&[u8]> = self.0
.iter()
.cloned()
.enumerate()
.filter(|&(_, b)| b==SEP_BYTE)
.map(|(pos, _)| &self.0[0..pos])
.collect();
prefixes.push(&self.0[..]);
prefixes
}
}


impl<'a, T: ?Sized + AsRef<str>> From<&'a T> for Facet {

fn from(path_asref: &'a T) -> Facet {
let path: &str = path_asref.as_ref();
assert!(!path.contains(SEP));
let mut facet_encoded = Vec::new();
let mut state = State::Idle;
Expand All @@ -76,26 +100,6 @@ impl Facet {
}
Facet(facet_encoded)
}

pub(crate) fn inner_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}

pub fn to_string(&self) -> String {
format!("{}", self)
}

pub fn prefixes(&self) -> Vec<&[u8]> {
let mut prefixes: Vec<&[u8]> = self.0
.iter()
.cloned()
.enumerate()
.filter(|&(_, b)| b==SEP_BYTE)
.map(|(pos, _)| &self.0[0..pos])
.collect();
prefixes.push(&self.0[..]);
prefixes
}
}

impl BinarySerializable for Facet {
Expand Down Expand Up @@ -138,7 +142,7 @@ impl<'de> Deserialize<'de> for Facet {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where
D: Deserializer<'de> {
<&'de str as Deserialize<'de>>::deserialize(deserializer)
.map(Facet::from_str)
.map(Facet::from)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/schema/field_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl FieldType {
))
}
FieldType::HierarchicalFacet => {
Ok(Value::HierarchicalFacet(Facet::from_str(field_text)))
Ok(Value::HierarchicalFacet(Facet::from(field_text)))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/termdict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
use super::{TermDictionaryImpl, TermDictionaryBuilderImpl, TermStreamerImpl};
use directory::{RAMDirectory, Directory, ReadOnlySource};
use std::path::PathBuf;
use schema::{FieldType, Term, SchemaBuilder, Document, TEXT};
use schema::{FieldType, SchemaBuilder, Document, TEXT};
use core::Index;
use std::str;
use termdict::TermStreamer;
Expand Down

0 comments on commit 34b7d2b

Please sign in to comment.