Skip to content

Commit

Permalink
Many Performance and Stablity improvements for Birch as found during …
Browse files Browse the repository at this point in the history
…Stablity and Performance Testing

-Fix signing issue when serializing and deserializing an unsigned short, use a reusable buffer in PageAlignedWriter, and other bug fixes
-Fix root cause for rare assertion seen during repairs while instance under high load
-Remove all memory mapped logic from PageAlignedReader and replace with a new on-heap buffered reader BufferingPageAlignedReader
-Address a few more todos, cleanup and improve SegmentedIterator logic including Closable/AutoClosable behavior, add more comments
-Many improvments and changes to SegmentedFile.SegmentIterator implementation to better with with PageAligned segments
-Handle Iterators that implement AutoClosable in CollationController
-Fixes many file descriptor leaks and verified with multiple stress loads for 24+ hours
-Hard code disable early open logic in SSTableRewriter as it doesn't work with the Birch index design that needs to be finalized at the end of the file serialization
-Limit resized capacity value of Overflow backing buffer to Integer.MAX_VALUE
-Switch Overflow to use a direct allocated ByteBuffer instead of heap byte buffer as large overflow pages can currently put significant pressure on GC during compaction
-Speculative fix for java.lang.NullPointerException thrown @ PageAlignedReader.getAlignedSegmentAtIdx(PageAlignedReader.java:128) due to a premature closing of the ifile which should still be open at SSTableScanner$KeyScanningIterator.computeNext(SSTableScanner.java:271)
-Don't close IndexedEntry in SSTableNamesIterator as if SSTableScanner returns a SSTableNamesIterator it still hasn't hit EOF of the ifile and so we don't want to close the file descriptor yet
-Fix limit vs position causing incorrect key and value to be added for some Composites in some cases
-Fix another Overflow/ByteBuffer fail and commit changes to IndexInfo that I failed to commit with the last commit because I suck
-Fix serializedSize of IndexInfo key
-Use fromByteBuffer instead of Composite deserialize crap
-Avoid potentially doing expensive toByteBuffer on Composite multiple times
-Horrible typo. 1 << 21 is not 2GB
-Now that the result of toByteBuffer() from the Composite on the firstName is reused need to (more than ever) still duplicate() and return that
-Experemental removal of all mmap buffer logic from PageAlignedReader due to discovered performance concerns. Read from RAF directly instead
-Many performance related refactorings of PageAlignedReader, AlignedSegments and the aligned implemention of SegmentedFile
-Add new metrics to track the number of BirchReader operations that require us to go to the overflow page and number of bytes read per key during a Birch lookup
-Implement BufferingPageAlignedReader which returns bytes as necessary from the RAF via an on-heap backing buffer that is aligned with the size of the PageAligned file being read by the PageAlignedReader instance
  • Loading branch information
Michael Kjellman committed Oct 6, 2016
1 parent 471ce65 commit 41c6d43
Show file tree
Hide file tree
Showing 37 changed files with 1,640 additions and 1,020 deletions.
Expand Up @@ -352,7 +352,7 @@ else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
}
else
{
indexAccessMode = conf.disk_access_mode;
indexAccessMode = mmap_cache_aligned;
logger.info("DiskAccessMode is {}, indexAccessMode is {}", conf.disk_access_mode, indexAccessMode);
}
}
Expand Down
45 changes: 17 additions & 28 deletions src/java/org/apache/cassandra/db/BirchIndexedEntry.java
Expand Up @@ -29,22 +29,22 @@
import org.apache.cassandra.db.index.birch.BirchReader;
import org.apache.cassandra.db.index.birch.PageAlignedReader;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.util.FileUtils;

/**
* An IndexedEntry implementation that is backed by a BirchWriter Index
*/
public class BirchIndexedEntry implements IndexedEntry
public class BirchIndexedEntry implements IndexedEntry, AutoCloseable
{
private final long position;
private final CType type;
private final PageAlignedReader reader;
private final BirchReader<IndexInfo> birchReader;
private final DeletionTime deletionTime;
private final AlignedSegment readerSegment;
private final int readerSegmentIdx;
private final short readerSubSegmentIdx;

private BirchReader<IndexInfo>.BirchIterator iterator = null;
private int nextIndexIdx = -1;
private int lastDeserializedBlock = -1;
private boolean iteratorDirectionReversed = false;

public BirchIndexedEntry(long position, CType type, PageAlignedReader reader, DeletionTime deletionTime) throws IOException
Expand All @@ -54,7 +54,8 @@ public BirchIndexedEntry(long position, CType type, PageAlignedReader reader, De
this.reader = reader;
this.deletionTime = deletionTime;
this.birchReader = new BirchReader<>(reader);
this.readerSegment = reader.getCurrentSegment();
this.readerSegmentIdx = reader.getCurrentSegmentIdx();
this.readerSubSegmentIdx = reader.getCurrentSubSegmentIdx();
}

public boolean isIndexed()
Expand Down Expand Up @@ -95,8 +96,8 @@ public List<IndexInfo> getAllColumnIndexes()

public IndexInfo getIndexInfo(Composite name, CellNameType comparator, boolean reversed) throws IOException
{
assert !reader.getCurrentSubSegment().shouldUseSingleMmappedBuffer()
&& reader.getCurrentSegment().idx == readerSegment.idx;
assert reader.isCurrentSubSegmentPageAligned()
&& reader.getCurrentSegmentIdx() == readerSegmentIdx;

iteratorDirectionReversed = reversed;
return birchReader.search(name, comparator, reversed);
Expand All @@ -109,31 +110,19 @@ public static Comparator<IndexInfo> getComparator(final CType nameComparator, bo

public boolean hasNext()
{
assert reader.getCurrentSegment().idx == readerSegment.idx;

if (lastDeserializedBlock == nextIndexIdx)
{
if (iteratorDirectionReversed)
nextIndexIdx--;
else
nextIndexIdx++;
}

assert reader.getCurrentSegmentIdx() == readerSegmentIdx;
return iterator != null && iterator.hasNext();
}

public IndexInfo next()
{
assert reader.getCurrentSegment().idx == readerSegment.idx;

lastDeserializedBlock = nextIndexIdx;

assert reader.getCurrentSegmentIdx() == readerSegmentIdx;
return (iterator != null) ? iterator.next() : null;
}

public void startIteratorAt(Composite name, CellNameType comparator, boolean reversed) throws IOException
{
assert reader.getCurrentSegment().idx == readerSegment.idx;
assert reader.getCurrentSegmentIdx() == readerSegmentIdx;

iteratorDirectionReversed = reversed;
iterator = birchReader.getIterator(name, comparator, reversed);
Expand All @@ -144,26 +133,26 @@ public boolean isReversed()
return iteratorDirectionReversed;
}

@Override
public void close()
{
// todo kjkj should BirchIndexedEntry close the reader?
//FileUtils.closeQuietly(reader);
FileUtils.closeQuietly(birchReader);
FileUtils.closeQuietly(reader);
}

public void reset(boolean reversed, long position)
public void reset(boolean reversed)
{
try
{
reader.setSegment(readerSegment.idx, 1);
// the birch index is always in the 2nd sub-segment
reader.setSegment(readerSegmentIdx, 1);
this.iterator = birchReader.getIterator(type, reversed);
}
catch (IOException e)
{
throw new RuntimeException(e);
}

lastDeserializedBlock = -1;
nextIndexIdx = -1;
iteratorDirectionReversed = reversed;
}
}
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/CollationController.java
Expand Up @@ -323,8 +323,16 @@ public Cell apply(Cell cell)
finally
{
for (Object iter : iterators)
{
if (iter instanceof Closeable)
{
FileUtils.closeQuietly((Closeable) iter);
}
else if (iter instanceof AutoCloseable)
{
FileUtils.closeQuietly((AutoCloseable) iter);
}
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/java/org/apache/cassandra/db/IndexedEntry.java
Expand Up @@ -45,7 +45,7 @@ public interface IndexedEntry extends IMeasurableMemory, Iterator<IndexInfo>

void startIteratorAt(Composite name, CellNameType comparator, boolean reversed) throws IOException;

void reset(boolean reversed, long position);
void reset(boolean reversed);

void close();

Expand Down Expand Up @@ -128,7 +128,7 @@ public IndexedEntry deserialize(DataInput in, Descriptor.Version version) throws
DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
reader.nextSubSegment();
IndexedEntry entry = new BirchIndexedEntry(position, type, reader, deletionTime);
reader.seek(reader.getCurrentSubSegment().getEndOffset());
reader.seekToEndOfCurrentSubSegment();
return entry;
}
else
Expand Down Expand Up @@ -168,23 +168,23 @@ public static void skip(DataInput in, Descriptor.Version version) throws IOExcep
// todo: kjkj can this be cleaned up/simplified? e.g. i think the logic should/could be,
// always set to start of next segment unless current segment is the last segment, in
// which case seek to the end of the current segment
if (reader.getCurrentSegment().idx == reader.numberOfSegments() - 1)
if (reader.getCurrentSegmentIdx() == reader.numberOfSegments() - 1)
{
// if the current sub-segment is the last available sub-segment (and current is the last segment)
// seek to the end of the sub-segment to ensure isEOF will return true regardless of iteration order
if (reader.getCurrentSubSegment().idx == reader.getCurrentSegment().getSubSegments().size() - 1)
if (reader.getCurrentSubSegmentIdx() == reader.numberOfSubSegments() - 1)
{
reader.seek(reader.getCurrentSegment().getLastSubSegment().getEndOffset());
reader.seekToEndOfCurrentSegment();
}
else
{
// skip to the beginning of the next sub-segment
reader.setSegment(reader.getCurrentSubSegment().idx, reader.getCurrentSubSegment().idx + 1);
reader.setSegment(reader.getCurrentSegmentIdx(), reader.getCurrentSubSegmentIdx() + 1);
}
}
else
{
reader.setSegment(reader.getCurrentSegment().idx + 1);
reader.setSegment(reader.getCurrentSegmentIdx() + 1);
}
}
else
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/NonIndexedRowEntry.java
Expand Up @@ -106,7 +106,7 @@ public void close()
//throw new UnsupportedOperationException();
}

public void reset(boolean reversed, long position)
public void reset(boolean reversed)
{
throw new UnsupportedOperationException();
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/OnHeapIndexedEntry.java
Expand Up @@ -162,7 +162,7 @@ public void close()

}

public void reset(boolean reversed, long position)
public void reset(boolean reversed)
{
iteratorDirectionReversed = reversed;
lastDeserializedBlock = -1;
Expand Down
Expand Up @@ -85,7 +85,7 @@ public IndexedSliceReader(SSTableReader sstable, IndexedEntry indexEntry, FileDa
}
else
{
indexEntry.reset(reversed, 0); //tmp kjkjk
indexEntry.reset(reversed);
emptyColumnFamily.delete(indexEntry.deletionTime());
fetcher = new IndexedBlockFetcher(indexEntry.getPosition());
}
Expand Down Expand Up @@ -153,7 +153,7 @@ public void close() throws IOException
{
if (originalInput == null && file != null)
file.close();
indexedEntry.close(); // todo: kjkj is this the right place to close?
indexedEntry.close();
}

protected void addColumn(OnDiskAtom col)
Expand Down
Expand Up @@ -60,8 +60,8 @@ public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice
public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, IndexedEntry indexEntry)
{
this.key = key;
this.indexEntry = indexEntry;
reader = createReader(sstable, indexEntry, file, slices, reversed);
this.indexEntry = indexEntry;
}

private static OnDiskAtomIterator createReader(SSTableReader sstable, IndexedEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
Expand Down Expand Up @@ -96,6 +96,7 @@ public void remove()
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException
{
if (indexEntry != null)
Expand Down
Expand Up @@ -840,7 +840,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, Cl
if (row == null)
continue;
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
if (writer.append(compactedRow) != null)
if (writer.append(compactedRow))
totalkeysWritten++;
}

Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.concurrent.Refs;

public class CompactionTask extends AbstractCompactionTask
Expand Down Expand Up @@ -194,7 +193,7 @@ public boolean apply(SSTableReader sstable)
throw new CompactionInterruptedException(ci.getCompactionInfo());

AbstractCompactedRow row = iter.next();
if (writer.append(row) != null)
if (writer.append(row))
{
totalKeysWritten++;
if (newSSTableSegmentThresholdReached(writer.currentWriter()))
Expand Down

0 comments on commit 41c6d43

Please sign in to comment.