Skip to content

Commit

Permalink
In PR apache#5285,
Browse files Browse the repository at this point in the history
the raw index writer format was changed to use 8 byte offset
for each chunk in the file header. The writer version
was bumped to 3. This was done to support > 2GB indexes.
The change was backward compatible to continue the support
for reading existing/old segments using 4-byte offsets

While there is no problem with the change, it prevents rollback.
So if there is any orthogonal issue while rolling out a release,
we can't rollback to older Pinot release since segments already
generated with 8-byte offsets can't be read by old code.

This config option is temporary to help with internal roll-out
by keeping the 8-byte format disabled by default thus allowing
rollback due to any issues. In the next couple of weeks after
internal rollout, we plan to remove this option.
  • Loading branch information
Siddharth Teotia committed Jun 5, 2020
1 parent c152f18 commit e51d7e3
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.io.writer.impl.v1;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -36,6 +37,9 @@
* Base class for fixed and variable byte writer implementations.
*/
public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleValueWriter {
public static final int DEFAULT_VERSION = 2;
public static final int CURRENT_VERSION = 3;

private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSingleValueWriter.class);
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = Integer.BYTES;
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;
Expand All @@ -60,12 +64,13 @@ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleVa
* @param numDocsPerChunk Number of docs per data chunk
* @param chunkSize Size of chunk
* @param sizeOfEntry Size of entry (in bytes), max size for variable byte implementation.
* @param version Version of file
* @param version format version used to determine whether to use 8 or 4 byte chunk offsets
* @throws FileNotFoundException
*/
protected BaseChunkSingleValueWriter(File file, ChunkCompressorFactory.CompressionType compressionType, int totalDocs,
int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
throws FileNotFoundException {
Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION);
_chunkSize = chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
_headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
*/
@NotThreadSafe
public class FixedByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {

private static final int CURRENT_VERSION = 3;
private int _chunkDataOffset;

/**
Expand All @@ -64,15 +62,15 @@ public class FixedByteChunkSingleValueWriter extends BaseChunkSingleValueWriter
* @param compressionType Type of compression to use.
* @param totalDocs Total number of docs to write.
* @param numDocsPerChunk Number of documents per chunk.
* @param sizeOfEntry Size of entry (in bytes).
* @param sizeOfEntry Size of entry (in bytes)
* @param writerVersion writer version used to determine whether to use 8 or 4 byte chunk offsets
* @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found.
*/
public FixedByteChunkSingleValueWriter(File file, ChunkCompressorFactory.CompressionType compressionType,
int totalDocs, int numDocsPerChunk, int sizeOfEntry)
int totalDocs, int numDocsPerChunk, int sizeOfEntry, int writerVersion)
throws FileNotFoundException {

super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * numDocsPerChunk), sizeOfEntry,
CURRENT_VERSION);
writerVersion);
_chunkDataOffset = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
*/
@NotThreadSafe
public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
private static final int CURRENT_VERSION = 3;
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;

private final int _chunkHeaderSize;
Expand All @@ -67,16 +66,16 @@ public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
* @param compressionType Type of compression to use.
* @param totalDocs Total number of docs to write.
* @param numDocsPerChunk Number of documents per chunk.
* @param lengthOfLongestEntry Length of longest entry (in bytes).
* @param lengthOfLongestEntry Length of longest entry (in bytes)
* @param writerVersion writer version used to determine whether to use 8 or 4 byte chunk offsets
* @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found.
*/
public VarByteChunkSingleValueWriter(File file, ChunkCompressorFactory.CompressionType compressionType, int totalDocs,
int numDocsPerChunk, int lengthOfLongestEntry)
int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
throws FileNotFoundException {

super(file, compressionType, totalDocs, numDocsPerChunk,
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize
lengthOfLongestEntry, CURRENT_VERSION);
lengthOfLongestEntry, writerVersion);

_chunkHeaderOffset = 0;
_chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.SingleValueRawIndexCreator;
import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand Down Expand Up @@ -200,7 +201,7 @@ private void convertColumn(FieldSpec fieldSpec)
int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
try (SingleValueRawIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
.getRawIndexCreatorForColumn(_convertedIndexDir, ChunkCompressorFactory.CompressionType.SNAPPY, columnName,
dataType, _originalSegmentMetadata.getTotalDocs(), lengthOfLongestEntry)) {
dataType, _originalSegmentMetadata.getTotalDocs(), lengthOfLongestEntry, false, BaseChunkSingleValueWriter.DEFAULT_VERSION)) {
BlockSingleValIterator iterator = (BlockSingleValIterator) dataSource.nextBlock().getBlockValueSet().iterator();
int docId = 0;
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.util.PinotDataBitSet;
import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.ColumnIndexCreationInfo;
import org.apache.pinot.core.segment.creator.ForwardIndexCreator;
import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
Expand Down Expand Up @@ -195,9 +196,10 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio

// Initialize forward index creator
boolean deriveNumDocsPerChunk = shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties());
_forwardIndexCreatorMap.put(columnName,
getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, fieldSpec.getDataType(), totalDocs,
indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk));
indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion));

// Initialize text index creator
if (_textIndexColumns.contains(columnName)) {
Expand All @@ -223,6 +225,18 @@ public static boolean shouldDeriveNumDocsPerChunk(String columnName, Map<String,
return false;
}

public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
if (columnProperties != null && columnProperties.get(columnName) != null) {
Map<String, String> properties = columnProperties.get(columnName);
String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
if (version == null) {
return BaseChunkSingleValueWriter.DEFAULT_VERSION;
}
return Integer.parseInt(version);
}
return BaseChunkSingleValueWriter.DEFAULT_VERSION;
}

/**
* Helper method that returns compression type to use based on segment creation spec and field type.
* <ul>
Expand Down Expand Up @@ -542,44 +556,43 @@ public static void removeColumnMetadataInfo(PropertiesConfiguration properties,
* @param column Column name
* @param totalDocs Total number of documents to index
* @param lengthOfLongestEntry Length of longest entry
* @return
* @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
* @param writerVersion version to use for the raw index writer
* @return raw index creator
* @throws IOException
*/
public static SingleValueRawIndexCreator getRawIndexCreatorForColumn(File file,
ChunkCompressorFactory.CompressionType compressionType, String column, FieldSpec.DataType dataType, int totalDocs,
int lengthOfLongestEntry)
throws IOException {
return getRawIndexCreatorForColumn(file, compressionType, column, dataType, totalDocs, lengthOfLongestEntry, false);
}

public static SingleValueRawIndexCreator getRawIndexCreatorForColumn(File file,
ChunkCompressorFactory.CompressionType compressionType, String column, FieldSpec.DataType dataType, int totalDocs,
int lengthOfLongestEntry, boolean deriveNumDocsPerChunk)
int lengthOfLongestEntry, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {

SingleValueRawIndexCreator indexCreator;
switch (dataType) {
case INT:
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Integer.BYTES);
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Integer.BYTES,
writerVersion);
break;

case LONG:
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Long.BYTES);
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Long.BYTES,
writerVersion);
break;

case FLOAT:
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Float.BYTES);
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Float.BYTES,
writerVersion);
break;

case DOUBLE:
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Double.BYTES);
indexCreator = new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, Double.BYTES,
writerVersion);
break;

case STRING:
case BYTES:
indexCreator =
new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, lengthOfLongestEntry,
deriveNumDocsPerChunk);
deriveNumDocsPerChunk, writerVersion);
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.writer.impl.FixedByteSingleValueMultiColWriter;
import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
import org.apache.pinot.core.io.writer.impl.v1.FixedByteChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.BaseSingleValueRawIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
Expand Down Expand Up @@ -52,9 +53,26 @@ public class SingleValueFixedByteRawIndexCreator extends BaseSingleValueRawIndex
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int sizeOfEntry)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, sizeOfEntry, BaseChunkSingleValueWriter.DEFAULT_VERSION);
}

/**
* Constructor for the class
*
* @param baseIndexDir Index directory
* @param compressionType Type of compression to use
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param sizeOfEntry Size of entry (in bytes)
* @param writerVersion writer format version
* @throws IOException
*/
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int sizeOfEntry, int writerVersion)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter =
new FixedByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, sizeOfEntry);
new FixedByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, sizeOfEntry, writerVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
import org.apache.pinot.core.segment.creator.BaseSingleValueRawIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
Expand All @@ -33,18 +34,40 @@ public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCr

private final VarByteChunkSingleValueWriter _indexWriter;

/**
* Create a var-byte raw index creator for the given column
* @param baseIndexDir Index directory
* @param compressionType Type of compression to use
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param maxLength length of longest entry (in bytes)
* @throws IOException
*/
public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, maxLength, false);
this(baseIndexDir, compressionType, column, totalDocs, maxLength, false,
BaseChunkSingleValueWriter.DEFAULT_VERSION);
}

/**
* Create a var-byte raw index creator for the given column
* @param baseIndexDir Index directory
* @param compressionType Type of compression to use
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param maxLength length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
* @throws IOException
*/
public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressorFactory.CompressionType compressionType,
String column, int totalDocs, int maxLength, boolean deriveNumDocsPerChunk)
String column, int totalDocs, int maxLength, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
_indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength);
_indexWriter =
new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength, writerVersion);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,10 @@ void createV1ForwardIndexForTextIndex(String column, IndexLoadingConfig indexLoa
int dictionaryElementSize = 0;

boolean deriveNumDocsPerChunk = SegmentColumnarIndexCreator.shouldDeriveNumDocsPerChunk(column, indexLoadingConfig.getColumnProperties());
int writerVersion = SegmentColumnarIndexCreator.rawIndexWriterVersion(column, indexLoadingConfig.getColumnProperties());
SingleValueVarByteRawIndexCreator rawIndexCreator =
new SingleValueVarByteRawIndexCreator(_indexDir, ChunkCompressorFactory.CompressionType.SNAPPY, column,
totalDocs, lengthOfLongestEntry, deriveNumDocsPerChunk);
totalDocs, lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);

for (int docId = 0; docId < totalDocs; docId++) {
rawIndexCreator.index(docId, defaultValue);
Expand Down

0 comments on commit e51d7e3

Please sign in to comment.