Skip to content

Commit

Permalink
Merge branch 'master' of github.com:senseidb/zoie
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhabiuk committed Apr 6, 2013
2 parents 42b213b + a204bf4 commit c1dad1e
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 77 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -22,7 +22,7 @@ Issues are tracked at:

Maven:

groupId: com.linkedin.zoie
groupId: com.senseidb.zoie

artifactId: zoie-core

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -23,7 +23,7 @@
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.linkedin.zoie</groupId>
<groupId>com.senseidb.zoie</groupId>
<artifactId>zoie-parent</artifactId>
<version>3.2.1-SNAPSHOT</version>
<relativePath>zoie-parent/pom.xml</relativePath>
Expand Down
8 changes: 5 additions & 3 deletions zoie-core/pom.xml
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.linkedin.zoie</groupId>
<groupId>com.senseidb.zoie</groupId>
<artifactId>zoie-parent</artifactId>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../zoie-parent/pom.xml</relativePath>
Expand Down Expand Up @@ -54,11 +54,13 @@
<artifactId>git-commit-id-plugin</artifactId>
<version>1.9</version>
<executions>

<execution>
<goals>
<goal>revision</goal>
</goals>
</execution>

</executions>
<configuration>
<prefix>git</prefix>
Expand All @@ -80,9 +82,9 @@
<version>1.2.13</version>
</dependency>
<dependency>
<groupId>fastutil</groupId>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.5</version>
<version>6.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
Expand Down
2 changes: 1 addition & 1 deletion zoie-core/src/main/java/proj/zoie/api/ZoieMultiReader.java
Expand Up @@ -163,7 +163,7 @@ public void markDeletes(LongSet delDocs, LongSet deletedUIDs)
}
}
}

@Override
public void commitDeletes()
{
Expand Down
Expand Up @@ -57,7 +57,6 @@ public void setMergePolicyParams(MergePolicyParams params){
setNumLargeSegments(params._numLargeSegments);
setMergeFactor(params._mergeFactor);
setMaxSmallSegments(params._maxSmallSegments);
setPartialExpunge(params._doPartialExpunge);
setUseCompoundFile(params._useCompoundFile);
setMaxMergeDocs(params._maxMergeDocs);
}
Expand Down
Expand Up @@ -310,7 +310,7 @@ protected final void flushBuffer()
{
log.debug("flushBuffer: post-flush: currentVersion: " + _currentVersion);
}
this.notifyAll(); // wake up the thread waiting in syncWthVersion()
this.notifyAll(); // wake up the thread waiting in syncWithVersion()
}
}

Expand Down
Expand Up @@ -44,7 +44,6 @@ public StreamDataProvider(Comparator<String> versionComparator)
{
_batchSize = 1;
_consumer = null;

_versionComparator = versionComparator;
}

Expand Down Expand Up @@ -91,6 +90,7 @@ public long getMaxEventsPerMinute()
}

private volatile long _maxEventsPerMinute = Long.MAX_VALUE;// begin with no
private volatile long _maxVolatileTimeInMillis = Long.MAX_VALUE; // begin with no volatile time limit

// indexing

Expand All @@ -101,7 +101,13 @@ public void setMaxEventsPerMinute(long maxEventsPerMinute)
if (thread == null)
return;
thread.setMaxEventsPerMinute(_maxEventsPerMinute);

}

public void setMaxVolatileTime(long timeInMillis) {
_maxVolatileTimeInMillis = timeInMillis;
DataThread<D> thread = _thread;
if (thread == null) return;
thread.setMaxVolatileTime(_maxVolatileTimeInMillis);
}

public String getStatus()
Expand Down Expand Up @@ -165,19 +171,14 @@ public void start()

_thread = new DataThread<D>(this);
_thread.setMaxEventsPerMinute(_maxEventsPerMinute);

_thread.setMaxVolatileTime(_maxVolatileTimeInMillis);
_thread.start();
}
}

public void syncWithVersion(long timeToWait, String version) throws ZoieException
public void syncWithVersion(long timeInMillis, String version) throws ZoieException
{
_thread.syncWthVersion(timeToWait, version);
}

public void syncWthVersion(long timeInMillis, String version) throws ZoieException
{
_thread.syncWthVersion(timeInMillis, version);
_thread.syncWithVersion(timeInMillis, version);
}

private static final class DataThread<D> extends Thread
Expand All @@ -188,7 +189,9 @@ private static final class DataThread<D> extends Thread
private volatile boolean _paused;
private volatile boolean _stop;
private AtomicLong _eventCount = new AtomicLong(0);
private volatile long _throttle = 40000;// Long.MAX_VALUE;
private volatile long _throttle = 40000;
private volatile long _maxVolatileTimeInMillis = Long.MAX_VALUE;
private volatile long _lastFlushTime = System.currentTimeMillis();
private boolean _flushing = false;
private final Comparator<String> _versionComparator;

Expand Down Expand Up @@ -271,6 +274,7 @@ private void flush()
{
log.error(e.getMessage(), e);
}
_lastFlushTime = System.currentTimeMillis();
}

private long lastcount = 0;
Expand All @@ -291,7 +295,7 @@ private synchronized void updateStats()
last60[currentslot] += count;
}

public void syncWthVersion(long timeInMillis, String version) throws ZoieException
public void syncWithVersion(long timeInMillis, String version) throws ZoieException
{
if (version == null) return;
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -354,7 +358,8 @@ public void run()
synchronized (this)
{
_batch.add(data);
if (_batch.size() >= _dataProvider._batchSize || _flushing)
if (_batch.size() >= _dataProvider._batchSize || _flushing
|| System.currentTimeMillis() - _lastFlushTime > _maxVolatileTimeInMillis)
{
flush();
_currentVersion = version;
Expand All @@ -365,7 +370,7 @@ public void run()
{
synchronized (this)
{
if (_flushing && (_batch.size() > 0))
if (_flushing || _batch.size() > 0)
{
flush();
_currentVersion = version;
Expand Down Expand Up @@ -415,6 +420,11 @@ private void setMaxEventsPerMinute(long maxEventsPerMinute)
{
_throttle = maxEventsPerMinute;
}

private void setMaxVolatileTime(long timeInMillis)
{
_maxVolatileTimeInMillis = timeInMillis;
}

}
}
Expand Up @@ -88,13 +88,19 @@ public void close()

abstract protected IndexReader openIndexReaderForDelete() throws IOException;

abstract public void refresh() throws IOException;
abstract public void refresh() throws IOException;

public void updateIndex(LongSet delDocs, List<IndexingReq> insertDocs,Analyzer defaultAnalyzer,Similarity similarity)
public void updateIndex(LongSet delDocs, List<IndexingReq> insertDocs,Analyzer defaultAnalyzer,Similarity similarity)
throws IOException
{
deleteDocs(delDocs);
if (delDocs != null && delDocs.size() > 0) {
deleteDocs(delDocs);
}

if (insertDocs == null || insertDocs.size() == 0) {
return;
}

IndexWriter idxMod = null;
try
{
Expand Down Expand Up @@ -153,7 +159,7 @@ public void markDeletes(LongSet delDocs) throws IOException
reader.decZoieRef();
}
}

public void commitDeletes() throws IOException
{
ZoieIndexReader<R> reader = null;
Expand Down Expand Up @@ -234,17 +240,19 @@ private void deleteDocs(LongSet delDocs) throws IOException

public void loadFromIndex(BaseSearchIndex<R> index) throws IOException
{
// yozhao: delete docs in disk index first
if (_delDocs != null && _delDocs.size() > 0) {
LongSet delDocs = _delDocs;
clearDeletes();
deleteDocs(delDocs);
}

// hao: open readOnly ram index reader
ZoieIndexReader<R> reader = index.openIndexReader();
if(reader == null) return;

Directory dir = reader.directory();

// hao: delete docs in disk index
LongSet delDocs = _delDocs;
clearDeletes();
deleteDocs(delDocs);


// hao: merge the readOnly ram index with the disk index
IndexWriter writer = null;
try
Expand All @@ -258,9 +266,9 @@ public void loadFromIndex(BaseSearchIndex<R> index) throws IOException
closeIndexWriter();
}
}



abstract public IndexWriter openIndexWriter(Analyzer analyzer,Similarity similarity) throws IOException;

public void closeIndexWriter()
Expand Down
Expand Up @@ -33,30 +33,23 @@ public class IndexReaderDispenser<R extends IndexReader>
private static final Logger log = Logger.getLogger(IndexReaderDispenser.class);

private static final int INDEX_OPEN_NUM_RETRIES=5;

// public static final String INDEX_DIRECTORY = "index.directory";


static final class InternalIndexReader<R extends IndexReader> extends ZoieMultiReader<R>
{
//private IndexSignature _sig;
private final IndexReaderDispenser<R> _dispenser;

InternalIndexReader(IndexReader in,IndexReaderDecorator<R> decorator,IndexReaderDispenser<R> dispenser) throws IOException
InternalIndexReader(IndexReader in,IndexReaderDecorator<R> decorator) throws IOException
{
super(in, decorator);
_dispenser = dispenser;
}

public InternalIndexReader(IndexReader in, IndexReader[] subReaders, IndexReaderDecorator<R> decorator,IndexReaderDispenser<R> dispenser) throws IOException
public InternalIndexReader(IndexReader in, IndexReader[] subReaders, IndexReaderDecorator<R> decorator) throws IOException
{
super(in, subReaders, decorator);
_dispenser = dispenser;
}

@Override
protected ZoieMultiReader<R> newInstance(IndexReader inner, IndexReader[] subReaders) throws IOException
{
return new InternalIndexReader<R>(inner,subReaders,_decorator,_dispenser);
return new InternalIndexReader<R>(inner,subReaders,_decorator);
}
}

Expand All @@ -68,7 +61,7 @@ protected ZoieMultiReader<R> newInstance(IndexReader inner, IndexReader[] subRea

public IndexReaderDispenser(DirectoryManager dirMgr, IndexReaderDecorator<R> decorator,DiskSearchIndex<R> idx)
{
_idx = idx;
_idx = idx;
_dirMgr = dirMgr;
_decorator = decorator;
_currentSignature = null;
Expand Down Expand Up @@ -134,7 +127,7 @@ private InternalIndexReader<R> newReader(DirectoryManager dirMgr, IndexReaderDec

try
{
reader=new InternalIndexReader<R>(srcReader, decorator,this);
reader=new InternalIndexReader<R>(srcReader, decorator);
_currentSignature = signature;
}
catch(IOException ioe)
Expand Down
Expand Up @@ -43,6 +43,9 @@ protected BaseSearchIndex<R> getSearchIndex() {
@Override
protected void propagateDeletes(LongSet delDocs) throws IOException
{
if (delDocs == null || delDocs.size() == 0) {
return;
}
RAMSearchIndex<R> readOnlyMemoryIdx = _idxMgr.getCurrentReadOnlyMemoryIndex();
if(readOnlyMemoryIdx != null)
{
Expand Down

0 comments on commit c1dad1e

Please sign in to comment.