Skip to content

Commit

Permalink
Clean up temporary files created during segment merge incase segment …
Browse files Browse the repository at this point in the history
…merge fails (#6324)

Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
RS146BIJAY committed Feb 15, 2023
1 parent 1540e00 commit 6b35c32
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add primary weight factor for balanced primary distribution ([#6017](https://github.com/opensearch-project/OpenSearch/pull/6017))
- Add a setting to control auto release of OpenSearch managed index creation block ([#6277](https://github.com/opensearch-project/OpenSearch/pull/6277))
- Fix timeout error when adding a document to an index with extension running ([#6275](https://github.com/opensearch-project/OpenSearch/pull/6275))
- Clean up temporary files created during segment merge incase segment merge fails ([#6324](https://github.com/opensearch-project/OpenSearch/pull/6324))

### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
Expand Down Expand Up @@ -125,4 +126,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,7 @@ public void forceMerge(
} catch (Exception e) {
try {
maybeFailEngine("force merge", e);
indexWriter.flush();
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3857,20 +3857,21 @@ protected void doRun() throws Exception {

private static class ThrowingIndexWriter extends IndexWriter {
private AtomicReference<Supplier<Exception>> failureToThrow = new AtomicReference<>();
private AtomicReference<Supplier<Exception>> forceMergeFailureToThrow = new AtomicReference<>();

ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
}

@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
maybeThrowFailure();
maybeThrowFailure(failureToThrow.get());
return super.addDocument(doc);
}

private void maybeThrowFailure() throws IOException {
if (failureToThrow.get() != null) {
Exception failure = failureToThrow.get().get();
private void maybeThrowFailure(Supplier<Exception> failureToThrow) throws IOException {
if (failureToThrow != null) {
Exception failure = failureToThrow.get();
clearFailure(); // one shot
if (failure instanceof RuntimeException) {
throw (RuntimeException) failure;
Expand All @@ -3884,22 +3885,39 @@ private void maybeThrowFailure() throws IOException {

@Override
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException {
maybeThrowFailure();
maybeThrowFailure(failureToThrow.get());
return super.softUpdateDocument(term, doc, softDeletes);
}

@Override
public long deleteDocuments(Term... terms) throws IOException {
maybeThrowFailure();
maybeThrowFailure(failureToThrow.get());
return super.deleteDocuments(terms);
}

@Override
public void forceMerge(int maxNumSegments) throws IOException {
maybeThrowFailure(forceMergeFailureToThrow.get());
super.forceMerge(maxNumSegments);
}

@Override
public void forceMerge(int maxNumSegments, boolean doWait) throws IOException {
maybeThrowFailure(forceMergeFailureToThrow.get());
super.forceMerge(maxNumSegments, doWait);
}

public void setThrowFailure(Supplier<Exception> failureSupplier) {
failureToThrow.set(failureSupplier);
}

public void setForceMergeThrowFailure(Supplier<Exception> failureSupplier) {
forceMergeFailureToThrow.set(failureSupplier);
}

public void clearFailure() {
failureToThrow.set(null);
forceMergeFailureToThrow.set(null);
}
}

Expand Down Expand Up @@ -3975,6 +3993,37 @@ public BytesRef binaryValue() {
}
}

public void testHandleForceMergeFailureOnDiskFull() throws Exception {
try (Store store = createStore()) {
AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
AtomicReference<Directory> dir = new AtomicReference<>();
try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, (directory, iwc) -> {
dir.set(directory);
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get();
})) {
throwingIndexWriter.get().setForceMergeThrowFailure(() -> new IOException("No space left on device"));
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
assertFalse(containsCompoundFiles(dir.get().listAll()));
engine.ensureOpen();
expectThrows(IOException.class, () -> engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()));
// Index writer flush gets called when forceMerge fails
assertTrue(containsCompoundFiles(dir.get().listAll()));
}
}
}

private boolean containsCompoundFiles(final String files[]) {
for (final String file : files) {
if (file.endsWith(".cfs") || file.endsWith(".cfe")) {
return true;
}
}

return false;
}

public void testDeleteWithFatalError() throws Exception {
final IllegalStateException tragicException = new IllegalStateException("fail to store tombstone");
try (Store store = createStore()) {
Expand Down

0 comments on commit 6b35c32

Please sign in to comment.