Skip to content

Commit

Permalink
work around starvation issue where multiple indexing threads flush se…
Browse files Browse the repository at this point in the history
…gments faster than the single thread can publish them

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene_solr_4_0@1394710 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mikemccand committed Oct 5, 2012
1 parent 76f67b0 commit 254d2ad
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
17 changes: 15 additions & 2 deletions lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
Expand Up @@ -432,7 +432,20 @@ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOExcepti
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
ticketQueue.tryPurge(this);
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadState()) {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers. But really
// this means we have a concurrency issue
// (TestBagOfPostings can provoke this):
// publishing a flush segment is too heavy today
// (it builds CFS, writes .si, etc.) ... we need
// to make those ops concurrent too:
ticketQueue.forcePurge(this);
} else {
ticketQueue.tryPurge(this);
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
Expand Down Expand Up @@ -496,7 +509,7 @@ private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDele
final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes);
}
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
Expand Down
Expand Up @@ -41,8 +41,7 @@ void addDeletesAndPurge(DocumentsWriter writer,
// a window for #anyChanges to fail
boolean success = false;
try {
queue
.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
success = true;
} finally {
if (!success) {
Expand Down Expand Up @@ -111,15 +110,15 @@ private void innerPurge(DocumentsWriter writer) throws IOException {
if (canPublish) {
try {
/*
* if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* be a ticket still in the queue.
*/
head.publish(writer);
} finally {
synchronized (this) {
// finally remove the publised ticket from the queue
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
ticketCount.decrementAndGet();
assert poll == head;
Expand Down Expand Up @@ -152,6 +151,10 @@ void tryPurge(DocumentsWriter writer) throws IOException {
}
}

public int getTicketCount() {
return ticketCount.get();
}

synchronized void clear() {
queue.clear();
ticketCount.set(0);
Expand Down Expand Up @@ -186,7 +189,7 @@ protected boolean canPublish() {
return true;
}
}

static final class SegmentFlushTicket extends FlushTicket {
private FlushedSegment segment;
private boolean failed = false;
Expand Down
Expand Up @@ -508,8 +508,14 @@ public synchronized IndexInput openInput(String name, IOContext context) throws
final IndexInput ii;
int randomInt = randomState.nextInt(500);
if (randomInt == 0) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
}
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
} else if (randomInt == 1) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
}
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
} else {
ii = new MockIndexInputWrapper(this, name, delegateInput);
Expand Down Expand Up @@ -660,7 +666,6 @@ public synchronized void close() throws IOException {
endFiles = endSet.toArray(new String[0]);

if (!Arrays.equals(startFiles, endFiles)) {
StringBuilder sb = new StringBuilder();
List<String> removed = new ArrayList<String>();
for(String fileName : startFiles) {
if (!endSet.contains(fileName)) {
Expand Down

0 comments on commit 254d2ad

Please sign in to comment.