Skip to content

Commit

Permalink
support concurrent merge scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gaolei authored and sunxiaoguang committed May 30, 2019
1 parent 67fd8ad commit d72d5bb
Show file tree
Hide file tree
Showing 27 changed files with 999 additions and 243 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -23,6 +23,7 @@ flate2 = "1.0.2"
lazy_static = "1.0"
log = "0.4"
memmap = "0.6"
num_cpus = "1.10.0"
rand = "0.5"
regex = "0.2"
serde = "1.0"
Expand Down
6 changes: 2 additions & 4 deletions src/core/codec/lucene60/points_writer.rs
Expand Up @@ -76,9 +76,7 @@ impl<D: Directory, DW: Directory, C: Codec> Lucene60PointsWriter<D, DW, C> {
}
}

impl<D: Directory, DW: Directory + 'static, C: Codec> PointsWriter
for Lucene60PointsWriter<D, DW, C>
{
impl<D: Directory, DW: Directory, C: Codec> PointsWriter for Lucene60PointsWriter<D, DW, C> {
fn write_field<P, MP>(
&mut self,
field_info: &FieldInfo,
Expand Down Expand Up @@ -279,7 +277,7 @@ impl<'a, D: Directory> ValuesIntersectVisitor<'a, D> {
}
}

impl<'a, D: Directory + 'static> IntersectVisitor for ValuesIntersectVisitor<'a, D> {
impl<'a, D: Directory> IntersectVisitor for ValuesIntersectVisitor<'a, D> {
fn visit(&mut self, _doc_id: DocId) -> Result<()> {
bail!(IllegalState("".into()))
}
Expand Down
116 changes: 90 additions & 26 deletions src/core/index/bufferd_updates.rs
Expand Up @@ -340,11 +340,16 @@ impl<C: Codec> BufferedUpdatesStream<C> {
self.bytes_used.load(Ordering::Acquire)
}

pub fn apply_deletes_and_updates<D: Directory, MS: MergeScheduler, MP: MergePolicy>(
pub fn apply_deletes_and_updates<D, MS, MP>(
&self,
pool: &ReaderPool<D, C, MS, MP>,
infos: &[Arc<SegmentCommitInfo<D, C>>],
) -> Result<ApplyDeletesResult<D, C>> {
) -> Result<ApplyDeletesResult<D, C>>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
let _l = self.lock.lock().unwrap();
let updates_stream = unsafe {
let stream = self as *const BufferedUpdatesStream<C> as *mut BufferedUpdatesStream<C>;
Expand All @@ -371,12 +376,17 @@ impl<C: Codec> BufferedUpdatesStream<C> {

/// Resolves the buffered deleted Term/Query/docIDs, into actual deleted
/// doc_ids in hte live_docs MutableBits for each SegmentReader
fn do_apply_deletes_and_updates<D: Directory, MS: MergeScheduler, MP: MergePolicy>(
fn do_apply_deletes_and_updates<D, MS, MP>(
&mut self,
pool: &ReaderPool<D, C, MS, MP>,
infos: &[Arc<SegmentCommitInfo<D, C>>],
seg_states: &mut Vec<SegmentState<D, C, MS, MP>>,
) -> Result<Option<ApplyDeletesResult<D, C>>> {
) -> Result<Option<ApplyDeletesResult<D, C>>>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
let mut coalesce_updates = CoalescedUpdates::default();

// We only init these on demand, when we find our first deletes that need to be applied:
Expand Down Expand Up @@ -517,10 +527,15 @@ impl<C: Codec> BufferedUpdatesStream<C> {
}

/// Delete by query
fn apply_query_deletes<'a, D: Directory + 'static, MS: MergeScheduler, MP: MergePolicy>(
fn apply_query_deletes<'a, D, MS, MP>(
queries: impl Iterator<Item = &'a (Arc<dyn Query<C>>, DocId)>,
seg_state: &mut SegmentState<D, C, MS, MP>,
) -> Result<u64> {
) -> Result<u64>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
let mut del_count: u64 = 0;
let mut rld = seg_state.rld.inner.lock()?;
let mut searcher = DefaultIndexSearcher::new(Arc::clone(rld.reader()));
Expand Down Expand Up @@ -554,11 +569,16 @@ impl<C: Codec> BufferedUpdatesStream<C> {
}

/// Merge sorts the deleted terms and all segments to resolve terms to doc_ids for deletion.
fn apply_term_deletes<D: Directory + 'static, MS: MergeScheduler, MP: MergePolicy>(
fn apply_term_deletes<D, MS, MP>(
&mut self,
updates: &CoalescedUpdates<C>,
seg_states: &mut [SegmentState<D, C, MS, MP>],
) -> Result<u64> {
) -> Result<u64>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
let start = Instant::now();
let num_readers = seg_states.len();

Expand Down Expand Up @@ -714,12 +734,17 @@ impl<C: Codec> BufferedUpdatesStream<C> {
}

/// Opens SegmentReader and inits SegmentState for each segment.
fn open_segment_states<D: Directory, MS: MergeScheduler, MP: MergePolicy>(
fn open_segment_states<D, MS, MP>(
&self,
pool: &ReaderPool<D, C, MS, MP>,
infos: &[Arc<SegmentCommitInfo<D, C>>],
seg_states: &mut Vec<SegmentState<D, C, MS, MP>>,
) -> Result<()> {
) -> Result<()>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
for info in infos {
match SegmentState::new(pool, info) {
Ok(state) => seg_states.push(state),
Expand All @@ -737,13 +762,18 @@ impl<C: Codec> BufferedUpdatesStream<C> {
}

/// Close segment states previously opened with openSegmentStates.
fn close_segment_states<D: Directory, MS: MergeScheduler, MP: MergePolicy>(
fn close_segment_states<D, MS, MP>(
&mut self,
pool: &ReaderPool<D, C, MS, MP>,
seg_states: &mut [SegmentState<D, C, MS, MP>],
success: bool,
gen: i64,
) -> Result<ApplyDeletesResult<D, C>> {
) -> Result<ApplyDeletesResult<D, C>>
where
D: Directory + Send + Sync + 'static,
MS: MergeScheduler,
MP: MergePolicy,
{
let mut first_err = Ok(ApplyDeletesResult::new(false, 0, vec![]));
let mut total_del_count = 0;
let mut all_deleted = vec![];
Expand Down Expand Up @@ -874,7 +904,12 @@ impl<D: Directory, C: Codec> ApplyDeletesResult<D, C> {
}
}

struct SegmentState<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> {
struct SegmentState<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
del_gen: i64,
rld: Arc<ReadersAndUpdates<D, C, MS, MP>>,
start_del_count: usize,
Expand All @@ -884,8 +919,12 @@ struct SegmentState<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: Me
any: bool,
}

impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy>
SegmentState<D, C, MS, MP>
impl<D, C, MS, MP> SegmentState<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn new(pool: &ReaderPool<D, C, MS, MP>, info: &Arc<SegmentCommitInfo<D, C>>) -> Result<Self> {
let rld: Arc<ReadersAndUpdates<D, C, MS, MP>> = pool.get_or_create(info)?;
Expand All @@ -909,22 +948,35 @@ impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy>
}
}

struct SegmentStateRef<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> {
struct SegmentStateRef<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
states: *const [SegmentState<D, C, MS, MP>],
index: usize,
}

impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy>
SegmentStateRef<D, C, MS, MP>
impl<D, C, MS, MP> SegmentStateRef<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn new(states: &[SegmentState<D, C, MS, MP>], index: usize) -> Self {
Self { states, index }
}
}

/// use for iter terms on binary heap
impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> Ord
for SegmentStateRef<D, C, MS, MP>
impl<D, C, MS, MP> Ord for SegmentStateRef<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn cmp(&self, other: &Self) -> CmpOrdering {
// reversed order
Expand All @@ -942,21 +994,33 @@ impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> Ord
}
}

impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> PartialOrd
for SegmentStateRef<D, C, MS, MP>
impl<D, C, MS, MP> PartialOrd for SegmentStateRef<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}

impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> Eq
for SegmentStateRef<D, C, MS, MP>
impl<D, C, MS, MP> Eq for SegmentStateRef<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
}

impl<D: Directory + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> PartialEq
for SegmentStateRef<D, C, MS, MP>
impl<D, C, MS, MP> PartialEq for SegmentStateRef<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn eq(&self, other: &Self) -> bool {
unsafe {
Expand Down
12 changes: 6 additions & 6 deletions src/core/index/directory_reader.rs
Expand Up @@ -59,7 +59,7 @@ pub fn index_exist<D: Directory>(directory: &D) -> Result<bool> {
}

pub struct StandardDirectoryReader<
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand All @@ -77,7 +77,7 @@ pub struct StandardDirectoryReader<

impl<D, C, MS, MP> StandardDirectoryReader<D, C, MS, MP>
where
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down Expand Up @@ -321,7 +321,7 @@ where

impl<D, C, MS, MP> IndexReader for StandardDirectoryReader<D, C, MS, MP>
where
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down Expand Up @@ -389,7 +389,7 @@ where

impl<D, C, MS, MP> fmt::Debug for StandardDirectoryReader<D, C, MS, MP>
where
D: Directory,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand All @@ -411,7 +411,7 @@ where

impl<D, C, MS, MP> AsRef<IndexReader<Codec = C>> for StandardDirectoryReader<D, C, MS, MP>
where
D: Directory,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand All @@ -423,7 +423,7 @@ where

impl<D, C, MS, MP> Drop for StandardDirectoryReader<D, C, MS, MP>
where
D: Directory,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down
28 changes: 14 additions & 14 deletions src/core/index/doc_consumer.rs
Expand Up @@ -69,7 +69,7 @@ pub trait DocConsumer<D: Directory, C: Codec> {
}

pub struct DefaultIndexingChain<
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct DefaultIndexingChain<

impl<D, C, MS, MP> Default for DefaultIndexingChain<D, C, MS, MP>
where
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand All @@ -121,7 +121,7 @@ where

impl<D, C, MS, MP> DefaultIndexingChain<D, C, MS, MP>
where
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down Expand Up @@ -164,10 +164,7 @@ where
}

/// Writes all buffered points.
fn write_points<DW: Directory + 'static>(
&mut self,
state: &SegmentWriteState<D, DW, C>,
) -> Result<()> {
fn write_points<DW: Directory>(&mut self, state: &SegmentWriteState<D, DW, C>) -> Result<()> {
let mut points_writer = None;
for per_field in &mut self.field_hash {
if per_field.point_values_writer.is_some() {
Expand Down Expand Up @@ -627,7 +624,7 @@ where

impl<D, C, MS, MP> DocConsumer<D, C> for DefaultIndexingChain<D, C, MS, MP>
where
D: Directory + 'static,
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
Expand Down Expand Up @@ -671,10 +668,7 @@ where
res
}

fn flush<DW: Directory + 'static>(
&mut self,
state: &mut SegmentWriteState<D, DW, C>,
) -> Result<()> {
fn flush<DW: Directory>(&mut self, state: &mut SegmentWriteState<D, DW, C>) -> Result<()> {
debug_assert!(self.inited);
// NOTE: caller (DocumentsWriterPerThread) handles
// aborting on any exception from this method
Expand Down Expand Up @@ -838,13 +832,19 @@ impl<T: TermsHashPerField> PerField<T> {
Ok(())
}

fn invert<D: Directory, C: Codec, MS: MergeScheduler, MP: MergePolicy>(
fn invert<D, C, MS, MP>(
&mut self,
field: &mut impl Fieldable,
doc_state: &DocState,
first: bool,
index_chain: &mut DefaultIndexingChain<D, C, MS, MP>,
) -> Result<()> {
) -> Result<()>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
if first {
// First time we're seeing this field (indexed) in
// this document:
Expand Down

0 comments on commit d72d5bb

Please sign in to comment.