Skip to content

Commit

Permalink
Merge remote branch 'jw/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
wonlay committed Nov 28, 2011
2 parents e770055 + 8ec3db5 commit 9d9f7da
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 18 deletions.
5 changes: 5 additions & 0 deletions zoie-core/resource/log4j.properties
@@ -0,0 +1,5 @@
log4j.rootLogger=INFO, console1

log4j.appender.console1=org.apache.log4j.ConsoleAppender
log4j.appender.console1.layout=org.apache.log4j.PatternLayout
log4j.appender.console1.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c] [%x] %m%n
Expand Up @@ -458,7 +458,7 @@ public IndexingReq[] buildIndexingReqs() {

super.setBatchSize(Math.max(1, batchSize)); // realtime memory batch size
_diskLoader = new DiskLuceneIndexDataLoader<R>(_analyzer, _similarity,
_searchIdxMgr,versionComparator);
_searchIdxMgr,versionComparator,_lsnrList);
_diskLoader.setOptimizeScheduler(new DefaultOptimizeScheduler(
getAdminMBean())); // note that the ZoieSystemAdminMBean zoieAdmin
// parameter for DefaultOptimizeScheduler is not
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.Comparator;
import java.util.Queue;

import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
Expand All @@ -32,6 +33,7 @@

import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieHealth;
import proj.zoie.api.indexing.IndexingEventListener;
import proj.zoie.api.indexing.OptimizeScheduler;
import proj.zoie.api.indexing.OptimizeScheduler.OptimizeType;
import proj.zoie.api.indexing.ZoieIndexable;
Expand All @@ -45,8 +47,8 @@ public class DiskLuceneIndexDataLoader<R extends IndexReader> extends LuceneInde
private Object _optimizeMonitor;
private volatile OptimizeScheduler _optScheduler;

public DiskLuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> comparator) {
super(analyzer, similarity, idxMgr,comparator);
public DiskLuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> comparator,Queue<IndexingEventListener> lsnrList) {
super(analyzer, similarity, idxMgr,comparator,lsnrList);
_lastTimeOptimized=System.currentTimeMillis();
_optimizeMonitor = new Object();
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
Expand All @@ -41,6 +42,7 @@
import proj.zoie.api.ZoieHealth;
import proj.zoie.api.ZoieSegmentReader;
import proj.zoie.api.indexing.AbstractZoieIndexable;
import proj.zoie.api.indexing.IndexingEventListener;
import proj.zoie.api.indexing.ZoieIndexable;
import proj.zoie.api.indexing.ZoieIndexable.IndexingReq;

Expand All @@ -53,12 +55,15 @@ public abstract class LuceneIndexDataLoader<R extends IndexReader> implements Da
protected final Comparator<String> _versionComparator;
private Filter _purgeFilter;

protected LuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> versionComparator) {
private final Queue<IndexingEventListener> _lsnrList;

protected LuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> versionComparator,Queue<IndexingEventListener> lsnrList) {
_analyzer = analyzer;
_similarity = similarity;
_idxMgr=idxMgr;
_versionComparator = versionComparator;
_purgeFilter = null;
_lsnrList = lsnrList;
}

public void setPurgeFilter(Filter purgeFilter){
Expand Down Expand Up @@ -187,8 +192,9 @@ public void consume(Collection<DataEvent<ZoieIndexable>> events) throws ZoieExce
for (List<IndexingReq> tmpList : addList.values()) {
docList.addAll(tmpList);
}
idx.updateIndex(delSet, docList, _analyzer,_similarity);

purgeDocuments();
idx.updateIndex(delSet, docList, _analyzer,_similarity);
propagateDeletes(delSet);
synchronized(_idxMgr)
{
Expand Down Expand Up @@ -228,6 +234,7 @@ public void loadFromIndex(RAMSearchIndex<R> ramIndex) throws ZoieException
idx.clearDeletes(); // clear old deletes as deletes are written to the lucene index
// hao: update the disk idx reader
idx.refresh(); // load the index reader
purgeDocuments();
idx.markDeletes(ramIndex.getDelDocs()); // inherit deletes
idx.commitDeletes();
idx.incrementEventCount(ramIndex.getEventsHandled());
Expand Down
Expand Up @@ -19,17 +19,20 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.Queue;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Similarity;

import proj.zoie.api.indexing.IndexingEventListener;

public class RAMLuceneIndexDataLoader<R extends IndexReader> extends LuceneIndexDataLoader<R>
{

public RAMLuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> comparator)
public RAMLuceneIndexDataLoader(Analyzer analyzer, Similarity similarity,SearchIndexManager<R> idxMgr,Comparator<String> comparator,Queue<IndexingEventListener> lsnrList)
{
super(analyzer, similarity,idxMgr,comparator);
super(analyzer, similarity,idxMgr,comparator,lsnrList);
}

@Override
Expand Down
Expand Up @@ -64,7 +64,7 @@ public RealtimeIndexDataLoader(DiskLuceneIndexDataLoader<R> dataLoader, int batc
_analyzer = analyzer;
_similarity = similarity;
_currentBatchSize = 0;
_ramConsumer = new RAMLuceneIndexDataLoader<R>(_analyzer, _similarity, _idxMgr,comparator);
_ramConsumer = new RAMLuceneIndexDataLoader<R>(_analyzer, _similarity, _idxMgr,comparator,lsnrList);
_luceneDataLoader = dataLoader;
}

Expand Down
86 changes: 81 additions & 5 deletions zoie-core/src/test/java/proj/zoie/test/ZoieTest.java
Expand Up @@ -28,29 +28,30 @@
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Version;
import org.junit.Test;

import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.api.DefaultDirectoryManager;
import proj.zoie.api.DirectoryManager;
import proj.zoie.api.DocIDMapper;
import proj.zoie.api.DocIDMapper.DocIDArray;
import proj.zoie.api.UIDDocIdSet;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieIndexReader;
import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.api.DocIDMapper.DocIDArray;

import proj.zoie.api.impl.DocIDMapperImpl;
import proj.zoie.api.impl.InRangeDocIDMapperFactory;
import proj.zoie.impl.indexing.AsyncDataConsumer;
import proj.zoie.impl.indexing.MemoryStreamDataProvider;
import proj.zoie.impl.indexing.ZoieSystem;
import proj.zoie.impl.indexing.ZoieConfig;
import proj.zoie.impl.indexing.ZoieSystem;
import proj.zoie.impl.indexing.internal.IndexSignature;
import proj.zoie.test.data.DataForTests;
import proj.zoie.test.mock.MockDataLoader;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void testIndexWithAnalyzer() throws ZoieException, IOException {
File idxDir = getIdxDir();
ZoieSystem<IndexReader, String> idxSystem = createZoie(
idxDir, true, 20, new WhitespaceAnalyzer(), null,
ZoieConfig.DEFAULT_VERSION_COMPARATOR);
ZoieConfig.DEFAULT_VERSION_COMPARATOR,false);
idxSystem.start();

MemoryStreamDataProvider<String> memoryProvider = new MemoryStreamDataProvider<String>(ZoieConfig.DEFAULT_VERSION_COMPARATOR);
Expand Down Expand Up @@ -258,6 +259,81 @@ public void testRealtime2() throws ZoieException {
}
}

@Test
public void testPurgeFilter() throws Exception {
File idxDir = getIdxDir();
ZoieSystem<IndexReader, String> idxSystem = createZoie(
idxDir, true, ZoieConfig.DEFAULT_VERSION_COMPARATOR,true);

idxSystem.setPurgeFilter(new QueryWrapperFilter(new MatchAllDocsQuery()));
idxSystem.start();

MemoryStreamDataProvider<String> memoryProvider = new MemoryStreamDataProvider<String>(ZoieConfig.DEFAULT_VERSION_COMPARATOR);
memoryProvider.setMaxEventsPerMinute(Long.MAX_VALUE);
memoryProvider.setDataConsumer(idxSystem);
memoryProvider.start();

try {
int count = DataForTests.testdata.length;
List<DataEvent<String>> list = new ArrayList<DataEvent<String>>(
count);
for (int i = 0; i < count; ++i) {
list.add(new DataEvent<String>(
DataForTests.testdata[i], ""+i));
}
memoryProvider.addEvents(list);
memoryProvider.flush();


idxSystem.flushEvents(10000);

List<ZoieIndexReader<IndexReader>> readers = idxSystem
.getIndexReaders();

MultiReader multiReader = new MultiReader(readers.toArray(new IndexReader[0]),false);

IndexSearcher searcher = new IndexSearcher(multiReader);

int numDocs = searcher.search(new MatchAllDocsQuery(), 10).totalHits;

searcher.close();
log.info("numdocs: "+numDocs);
TestCase.assertTrue(numDocs>0);

idxSystem.returnIndexReaders(readers);

idxSystem.getAdminMBean().flushToDiskIndex();


idxSystem.refreshDiskReader();
readers = idxSystem
.getIndexReaders();


multiReader = new MultiReader(readers.toArray(new IndexReader[0]),false);

searcher = new IndexSearcher(multiReader);

numDocs = searcher.search(new MatchAllDocsQuery(), 10).totalHits;

searcher.close();

numDocs = multiReader.numDocs();

log.info("new numdocs: "+numDocs);
TestCase.assertTrue(numDocs==0);

idxSystem.returnIndexReaders(readers);

} catch (IOException ioe) {
throw new ZoieException(ioe.getMessage());
} finally {
memoryProvider.stop();
idxSystem.shutdown();
deleteDirectory(idxDir);
}
}

@Test
public void testStore() throws ZoieException {
File idxDir = getIdxDir();
Expand Down
24 changes: 19 additions & 5 deletions zoie-core/src/test/java/proj/zoie/test/ZoieTestCaseBase.java
Expand Up @@ -20,6 +20,7 @@
import proj.zoie.api.ZoieIndexReader;
import proj.zoie.api.impl.InRangeDocIDMapperFactory;
import proj.zoie.api.indexing.IndexReaderDecorator;
import proj.zoie.impl.indexing.ReaderCacheFactory;
import proj.zoie.impl.indexing.SimpleReaderCache;
import proj.zoie.impl.indexing.ZoieConfig;
import proj.zoie.impl.indexing.ZoieSystem;
Expand Down Expand Up @@ -109,6 +110,18 @@ protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean r
return createZoie(idxDir, realtime, 20, versionComparator);
}


protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime, Comparator<String> versionComparator,boolean immediateRefresh)
{
return createZoie(idxDir, realtime, 20, versionComparator,immediateRefresh);
}

protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime, long delay, Comparator<String> versionComparator,boolean immediateRefresh)
{
return createZoie(idxDir,realtime,delay,null,null,versionComparator,immediateRefresh);
}


/**
* @param idxDir
* @param realtime
Expand All @@ -118,12 +131,12 @@ protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean r
*/
protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime, long delay, Comparator<String> versionComparator)
{
return createZoie(idxDir,realtime,delay,null,null,versionComparator);
return createZoie(idxDir,realtime,delay,null,null,versionComparator,false);
}

protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime,DocIDMapperFactory docidMapperFactory, Comparator<String> versionComparator)
{
return createZoie(idxDir, realtime, 2,null,docidMapperFactory, versionComparator);
return createZoie(idxDir, realtime, 2,null,docidMapperFactory, versionComparator,false);
}

/**
Expand All @@ -135,7 +148,7 @@ protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean r
* @param zoieVersionFactory
* @return
*/
protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime, long delay,Analyzer analyzer,DocIDMapperFactory docidMapperFactory, Comparator<String> versionComparator)
protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean realtime, long delay,Analyzer analyzer,DocIDMapperFactory docidMapperFactory, Comparator<String> versionComparator,boolean immediateRefresh)
{
ZoieConfig config = new ZoieConfig();
config.setDocidMapperFactory(docidMapperFactory);
Expand All @@ -145,8 +158,9 @@ protected static ZoieSystem<IndexReader,String> createZoie(File idxDir,boolean r
config.setVersionComparator(versionComparator);
config.setSimilarity(null);
config.setAnalyzer(null);
// config.setReadercachefactory(SimpleReaderCache.FACTORY);

if (immediateRefresh){
config.setReadercachefactory(SimpleReaderCache.FACTORY);
}
ZoieSystem<IndexReader,String> idxSystem=new ZoieSystem<IndexReader, String>(idxDir,new DataInterpreterForTests(delay,analyzer),
new TestIndexReaderDecorator(),config);
return idxSystem;
Expand Down

0 comments on commit 9d9f7da

Please sign in to comment.