Skip to content

Commit

Permalink
Fix a consitence issue about RAM and Disk index
Browse files Browse the repository at this point in the history
  • Loading branch information
yozhao committed Aug 29, 2013
1 parent 27dfe95 commit dc24281
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 63 deletions.
7 changes: 0 additions & 7 deletions zoie-core/src/main/java/proj/zoie/api/ZoieMultiReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,6 @@ public void commitDeletes() {
}
}

public void setDelDocIds() {
ZoieSegmentReader<R>[] subReaders = getSubReaders();
for (ZoieSegmentReader<R> subReader : subReaders) {
subReader.setDelDocIds();
}
}

public List<R> getDecoratedReaders() throws IOException {
return _decoratedReaders;
}
Expand Down
17 changes: 4 additions & 13 deletions zoie-core/src/main/java/proj/zoie/api/ZoieSegmentReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class ZoieSegmentReader<R extends IndexReader> extends FilterAtomicReader
private final IndexReaderDecorator<R> _decorator;
private IntRBTreeSet _delDocIdSet = new IntRBTreeSet();
private int[] _currentDelDocIds = null;
private int[] _delDocIds = null;
private long[] _uidArray = null;
private DocIDMapper _docIDMapper = null;

Expand Down Expand Up @@ -117,7 +116,7 @@ public Bits getLiveDocs() {
return new Bits() {
@Override
public boolean get(int index) {
int[] delSet = _delDocIds;
int[] delSet = _currentDelDocIds;
if (delSet != null && Arrays.binarySearch(delSet, index) >= 0) {
return false;
}
Expand Down Expand Up @@ -155,14 +154,6 @@ public void commitDeletes() {
_currentDelDocIds = _delDocIdSet.toIntArray();
}

public void setDelDocIds() {
_delDocIds = _currentDelDocIds;
}

public int[] getDelDocIds() {
return _delDocIds;
}

public R getDecoratedReader() {
return _decoratedReader;
}
Expand Down Expand Up @@ -192,7 +183,7 @@ public long getUID(int docid) {
}

public boolean isDeleted(int docid) {
int[] delSet = _delDocIds;
int[] delSet = _currentDelDocIds;
if (delSet != null && Arrays.binarySearch(delSet, docid) >= 0) {
return true;
}
Expand All @@ -209,8 +200,8 @@ public String getSegmentName() {

@Override
public int numDocs() {
if (_delDocIds != null) {
return super.numDocs() - _delDocIds.length;
if (_currentDelDocIds != null) {
return super.numDocs() - _currentDelDocIds.length;
} else {
return super.numDocs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public void markDeletes(LongSet delDocs) throws IOException {
ZoieMultiReader<R> reader = null;
synchronized (this) {
reader = openIndexReader();
if (reader == null) return;
if (reader == null) {
return;
}
reader.incZoieRef();
reader.markDeletes(delDocs, _delDocs);
reader.decZoieRef();
Expand All @@ -142,12 +144,13 @@ public void commitDeletes() throws IOException {
ZoieMultiReader<R> reader = null;
synchronized (this) {
reader = openIndexReader();
if (reader == null) return;
if (reader == null) {
return;
}
reader.incZoieRef();
reader.commitDeletes();
reader.decZoieRef();
}

}

private void deleteDocs(LongSet delDocs) throws IOException {
Expand All @@ -172,19 +175,21 @@ private void deleteDocs(LongSet delDocs) throws IOException {
}

public void loadFromIndex(BaseSearchIndex<R> index) throws IOException {
LongSet delDocs = null;
// delete docs in disk index first
if (_delDocs != null && _delDocs.size() > 0) {
LongSet delDocs = _delDocs;
clearDeletes();
deleteDocs(delDocs);
synchronized (this) {
if (_delDocs != null && _delDocs.size() > 0) {
delDocs = _delDocs;
clearDeletes();
}
}
deleteDocs(delDocs);

// open readOnly ram index reader
ZoieMultiReader<R> reader = index.openIndexReader();
if (reader == null) {
return;
}

// merge the readOnly ram index with the disk index
IndexWriter writer = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,25 @@ public void consume(Collection<DataEvent<ZoieIndexable>> events) throws ZoieExce
@Override
public void loadFromIndex(RAMSearchIndex<R> ramIndex) throws ZoieException {
synchronized (_optimizeMonitor) {
OptimizeType optType = _optScheduler.getScheduledOptimizeType();
_idxMgr.setPartialExpunge(optType == OptimizeType.PARTIAL);
try {
OptimizeType optType = _optScheduler.getScheduledOptimizeType();
_idxMgr.setPartialExpunge(optType == OptimizeType.PARTIAL);
super.loadFromIndex(ramIndex);
} finally {
_idxMgr.setDiskIndexerStatus(Status.Sleep);
_optScheduler.finished();
_idxMgr.setPartialExpunge(false);
}

if (optType == OptimizeType.FULL) {
try {
super.loadFromIndex(ramIndex);
expungeDeletes();
} catch (IOException ioe) {
ZoieHealth.setFatal();
throw new ZoieException(ioe.getMessage(), ioe);
} finally {
_optScheduler.finished();
_idxMgr.setPartialExpunge(false);
}

if (optType == OptimizeType.FULL) {
try {
expungeDeletes();
} catch (IOException ioe) {
ZoieHealth.setFatal();
throw new ZoieException(ioe.getMessage(), ioe);
} finally {
_optScheduler.finished();
}
}
} finally {
_idxMgr.setDiskIndexerStatus(Status.Sleep);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import proj.zoie.api.indexing.AbstractZoieIndexable;
import proj.zoie.api.indexing.ZoieIndexable;
import proj.zoie.api.indexing.ZoieIndexable.IndexingReq;
import proj.zoie.impl.indexing.internal.SearchIndexManager.Status;

public abstract class LuceneIndexDataLoader<R extends IndexReader> implements
DataConsumer<ZoieIndexable> {
Expand Down Expand Up @@ -109,7 +110,9 @@ private final void purgeDocuments() {
@Override
public void consume(Collection<DataEvent<ZoieIndexable>> events) throws ZoieException {

if (events == null) return;
if (events == null) {
return;
}
int eventCount = events.size();
if (eventCount == 0) {
return;
Expand Down Expand Up @@ -215,14 +218,16 @@ public void loadFromIndex(RAMSearchIndex<R> ramIndex) throws ZoieException {
.compare(idx.getVersion(), ramIndex.getVersion()) < 0 ? ramIndex.getVersion() : idx
.getVersion());
idx.setVersion(newVersion);
// update the disk idx reader
idx.refresh();
purgeDocuments();
// inherit deletes
idx.markDeletes(ramIndex.getDelDocs());
idx.commitDeletes();
idx.incrementEventCount(ramIndex.getEventsHandled());

synchronized (_idxMgr) {
// update the disk idx reader
idx.refresh();
purgeDocuments();
// inherit deletes
idx.markDeletes(ramIndex.getDelDocs());
idx.commitDeletes();
idx.incrementEventCount(ramIndex.getEventsHandled());
_idxMgr.setDiskIndexerStatus(Status.Sleep);
}
} catch (IOException ioe) {
ZoieHealth.setFatal();
log.error("Problem copying segments: " + ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected void propagateDeletes(LongSet delDocs) throws IOException {
if (delDocs == null || delDocs.size() == 0) {
return;
}

RAMSearchIndex<R> readOnlyMemoryIdx = _idxMgr.getCurrentReadOnlyMemoryIndex();
if (readOnlyMemoryIdx != null) {
readOnlyMemoryIdx.markDeletes(delDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ public void consume(Collection<DataEvent<D>> events) throws ZoieException {
synchronized (this) // this blocks the batch disk loader thread while indexing to RAM
{
int size = indexableList.size();
synchronized (_idxMgr) {
_ramConsumer.consume(indexableList);// consumer clear the list!
}
_ramConsumer.consume(indexableList);// consumer clear the list!
_currentBatchSize += size;
_eventCount += size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ public List<ZoieMultiReader<R>> getIndexReaders() throws IOException {

// the following order, e.g. B,A,Disk matters, see ZoieMultiReader.getSubZoieReaderAccessor:
// when doing UID->docid mapping, the freshest index needs to be first

if (memIndexB != null) // load memory index B
{
synchronized (memIndexB) {
reader = memIndexB.openIndexReader();
if (reader != null) {
reader = reader.copy();
reader.setDelDocIds();
readers.add(reader);
}
}
Expand All @@ -217,7 +217,6 @@ public List<ZoieMultiReader<R>> getIndexReaders() throws IOException {
reader = memIndexA.openIndexReader();
if (reader != null) {
reader = reader.copy();
reader.setDelDocIds();
readers.add(reader);
}
}
Expand All @@ -229,10 +228,8 @@ public List<ZoieMultiReader<R>> getIndexReaders() throws IOException {
reader = mem.get_diskIndexReader();
if (reader != null) {
reader = reader.copy();
reader.setDelDocIds();
readers.add(reader);
}

_diskVersion = getCurrentDiskVersion();
}
}
Expand Down Expand Up @@ -266,7 +263,7 @@ public void returnIndexReaders(List<ZoieMultiReader<R>> readers) {

public synchronized void setDiskIndexerStatus(Status status) {
if (_diskIndexerStatus == status) {
throw new RuntimeException("Wrong status to set");
return;
}
// going from sleep to wake, disk index starts to index
// which according to the spec, index B is created and it starts to collect data
Expand All @@ -278,10 +275,13 @@ public synchronized void setDiskIndexerStatus(Status status) {
Mem<R> oldMem = _mem;

RAMSearchIndex<R> memIndexA = oldMem.get_memIndexA();
if (memIndexA != null) memIndexA.closeIndexWriter();
if (memIndexA != null) {
memIndexA.closeIndexWriter();
}

RAMSearchIndex<R> memIndexB = _ramIndexFactory.newInstance(version, _indexReaderDecorator,
this);

Mem<R> mem = new Mem<R>(memIndexA, memIndexB, memIndexB, memIndexA,
oldMem.get_diskIndexReader());
synchronized (_memLock) {
Expand Down

0 comments on commit dc24281

Please sign in to comment.