Skip to content

Commit

Permalink
ISPN-8169 DirectoryImplementerTests.testConfigureAsyncDeletes random …
Browse files Browse the repository at this point in the history
…failures
  • Loading branch information
gustavonalle authored and anistor committed Aug 3, 2017
1 parent 5f30d03 commit 34acdf4
Showing 1 changed file with 38 additions and 64 deletions.
@@ -1,33 +1,32 @@
package org.infinispan.lucene.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.infinispan.Cache;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.lucene.CacheTestSupport;
import org.infinispan.lucene.directory.DirectoryBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

/**
* Tests covering DirecotoryImplementor class.
* Tests covering DirectoryImplementor class.
*
* @author Anna Manukyan
*/
Expand Down Expand Up @@ -71,7 +70,7 @@ public void testFailureOfOverrideWriteLocker() throws IOException {
public void testOverrideWriteLocker() throws IOException {
Directory dir = null;
try {
dir = DirectoryBuilder.newDirectoryInstance(cache, cache, cache, INDEX_NAME).chunkSize(BUFFER_SIZE)
dir = DirectoryBuilder.newDirectoryInstance(cache, cache, cache, INDEX_NAME).chunkSize(BUFFER_SIZE)
.overrideWriteLocker(new LockFactory() {
@Override
public Lock obtainLock(Directory dir, String lockName) throws IOException {
Expand Down Expand Up @@ -102,67 +101,42 @@ public void testGetIndexNameAndToString() throws IOException {
}
}

private void createFile(Directory directory, String name) throws IOException {
IndexOutput indexOutput = directory.createOutput(name, IOContext.DEFAULT);
indexOutput.writeByte((byte) 0);
indexOutput.close();
}

@Test
public void testConfigureAsyncDeletes() throws Exception {
Cache cache = cacheManager.getCache();
Directory dir = null;
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
iwc.setMaxBufferedDocs(2);
IndexWriter indexWriter = null;
Document document = new Document();
document.add(new TextField("field", "whatever", Field.Store.YES));
TrackingThreadPoolExecutor executorService = new TrackingThreadPoolExecutor();
try {
dir = DirectoryBuilder.newDirectoryInstance(cache, cache, cache, INDEX_NAME)
.chunkSize(BUFFER_SIZE)
.deleteOperationsExecutor(executorService)
.create();

indexWriter = new IndexWriter(dir, iwc);
indexWriter.addDocument(document);
indexWriter.commit();

indexWriter.addDocument(document);
indexWriter.commit();
ExecutorService executor = spy(Executors.newFixedThreadPool(1));

executorService.shutdown();
try (Directory directory = DirectoryBuilder.newDirectoryInstance(cache, cache, cache, INDEX_NAME)
.chunkSize(BUFFER_SIZE)
.deleteOperationsExecutor(executor)
.create()) {

AssertJUnit.assertTrue(executorService.isSegmentDeleted("0"));
AssertJUnit.assertTrue(executorService.isSegmentDeleted("1"));
createFile(directory, "file");

} finally {
if (indexWriter != null) indexWriter.close();
if (dir != null) dir.close();
}
}
assertEquals(directory.listAll().length, 1);

class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
private final Set<String> deletedSegments = new ConcurrentHashSet<>();
directory.deleteFile("file");

TrackingThreadPoolExecutor() {
super(0, 5, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CallerRunsPolicy());
}
eventuallyEquals(0, () -> {
try {
return directory.listAll().length;
} catch (IOException e) {
Assert.fail("Error inspecting directory", e);
return null;
}
});

private String extractSegmentName(String fileName) {
if (!fileName.startsWith("_")) {
return null;
} else {
return fileName.substring(1, fileName.indexOf('.'));
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
DirectoryLucene.DeleteTask task = (DirectoryLucene.DeleteTask) r;
String name = task.getFileName();
String segment = extractSegmentName(name);
if (segment != null) {
deletedSegments.add(segment);
}
}
verify(executor).execute(any(DirectoryLucene.DeleteTask.class));

public boolean isSegmentDeleted(String segmentName) {
return deletedSegments.contains(segmentName);
} finally {
executor.shutdownNow();
}
}
}

0 comments on commit 34acdf4

Please sign in to comment.