From 6524fa0faa6327307adc1567f1762ec6807c3981 Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Wed, 29 Apr 2026 05:19:47 +0800 Subject: [PATCH 1/2] Fix RocksDB prefix seek for versioned index keys --- pixels-index/pixels-index-rocksdb/pom.xml | 2 - .../pixels/index/rocksdb/RocksDBFactory.java | 4 +- .../index/rocksdb/TestRocksDBIndex.java | 50 ++++++++++++++++++- pom.xml | 2 + 4 files changed, 54 insertions(+), 4 deletions(-) diff --git a/pixels-index/pixels-index-rocksdb/pom.xml b/pixels-index/pixels-index-rocksdb/pom.xml index 2e019d349f..7cf0f69986 100644 --- a/pixels-index/pixels-index-rocksdb/pom.xml +++ b/pixels-index/pixels-index-rocksdb/pom.xml @@ -15,8 +15,6 @@ 8 8 UTF-8 - - 10.2.1 diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java index a6441818c4..5b1fd91ed9 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java @@ -189,7 +189,9 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length")); if (keyLen != null) { - fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id + // Prefix must only cover the logical lookup key. + // It must not include the encoded timestamp suffix. + fixedLengthPrefix = keyLen + (multiCF ? 0 : Long.BYTES); } CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style")); diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java index 67ed0306eb..6156901322 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java @@ -25,14 +25,18 @@ import io.pixelsdb.pixels.common.exception.SinglePointIndexException; import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.IndexUtils; import io.pixelsdb.pixels.index.IndexProto; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -43,6 +47,8 @@ public class TestRocksDBIndex { + private static final boolean MULTI_CF = + Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf")); private RocksDB rocksDB; private static final long TABLE_ID = 100L; private static final long INDEX_ID = 100L; @@ -172,6 +178,48 @@ public void testGetUniqueRowId() throws SinglePointIndexException assertEquals(rowId2, result, "getUniqueRowId should return the rowId of the latest timestamp entry"); } + @Test + public void testSeekFindsNextVersionWithSameLogicalPrefix() throws Exception + { + byte[] key = "testSeekSameLogicalPrefix".getBytes(); + long newerTimestamp = 1000L; + long olderTimestamp = newerTimestamp + 1000L; + + IndexProto.IndexKey newerKey = IndexProto.IndexKey.newBuilder() + .setIndexId(INDEX_ID) + .setKey(ByteString.copyFrom(key)) + .setTimestamp(newerTimestamp) + .build(); + IndexProto.IndexKey olderKey = IndexProto.IndexKey.newBuilder() + .setIndexId(INDEX_ID) + .setKey(ByteString.copyFrom(key)) + .setTimestamp(olderTimestamp) + .build(); + + uniqueIndex.putEntry(olderKey, 1L); + + ByteBuffer seekKey = toKeyBuffer(newerKey); + ByteBuffer expectedKey = toKeyBuffer(olderKey); + + ReadOptions readOptions = RocksDBThreadResources.getReadOptions(); + readOptions.setPrefixSameAsStart(true) + .setTotalOrderSeek(false) + .setVerifyChecksums(false); + + String cfName = IndexUtils.getCFName(TABLE_ID, INDEX_ID, 0, MULTI_CF); + ColumnFamilyHandle columnFamilyHandle = RocksDBFactory.getAllCfHandles().get(cfName); + assertNotNull(columnFamilyHandle, "column family handle should exist"); + + try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions)) + { + iterator.seek(seekKey); + assertTrue(iterator.isValid(), "seek should land on the next key with the same logical prefix"); + byte[] expectedBytes = new byte[expectedKey.remaining()]; + expectedKey.get(expectedBytes); + assertArrayEquals(expectedBytes, iterator.key()); + } + } + @Test public void testGetRowIds() throws SinglePointIndexException { @@ -396,4 +444,4 @@ public void benchmarkDeleteEntry() throws SinglePointIndexException double durationMs = (end - start) / 1_000_000.0; System.out.printf("Deleted %,d entries in %.2f ms (%.2f ops/sec)%n", count, durationMs, count * 1000.0 / durationMs); } -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index 78011c6a56..92e8d7c814 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,8 @@ 1.7.36 + 10.2.1 + 4.13.2 1.8.2 From 702f903b70db42f287d8beb827c19f892ccfecec Mon Sep 17 00:00:00 2001 From: AntiO2 Date: Fri, 1 May 2026 23:08:15 +0800 Subject: [PATCH 2/2] Expand test coverage for RocksDB prefix seek with versioned index keys Validate prefix seek correctly lands on the closest stored version across multiple timestamp scenarios. Tune prefix length and enable multiCF to support multi-column-family index lookup. --- .../src/main/resources/pixels.properties | 4 +- .../index/rocksdb/TestRocksDBIndex.java | 88 ++++++++++++------- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index cdfad1b847..b9f81ddce6 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -367,7 +367,7 @@ index.rocksdb.target.file.size.base=67108864 # rocksdb file size multiplier (default to 1) index.rocksdb.target.file.size.multiplier=1 # rocksdb key fixed prefix length -index.rocksdb.prefix.length=12 +index.rocksdb.prefix.length=4 # rocksdb max subcompactions index.rocksdb.max.subcompactions=1 # rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION) @@ -391,7 +391,7 @@ index.cache.capacity=10000000 # The expiration time (in seconds) of cache entries index.cache.expiration.seconds=3600 # whether each index corresponds to its own column family -index.rocksdb.multicf=false +index.rocksdb.multicf=true index.bucket.num=128 # the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file index.sqlite.path=/tmp/sqlite diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java index 6156901322..4212bf054f 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java @@ -37,33 +37,40 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toBuffer; import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toKeyBuffer; +import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.startsWith; import static org.junit.jupiter.api.Assertions.*; public class TestRocksDBIndex { private static final boolean MULTI_CF = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf")); + private static final Logger log = LoggerFactory.getLogger(TestRocksDBIndex.class); private RocksDB rocksDB; private static final long TABLE_ID = 100L; private static final long INDEX_ID = 100L; + private static final int VNODE_ID = 0; private SinglePointIndex uniqueIndex; private SinglePointIndex nonUniqueIndex; - + private ColumnFamilyHandle columnFamilyHandle; @BeforeEach public void setUp() throws RocksDBException,SinglePointIndexException { IndexOption option = IndexOption.builder() - .vNodeId(0) + .vNodeId(VNODE_ID) .build(); uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true, option); nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false, option); rocksDB = RocksDBFactory.getRocksDB(); + columnFamilyHandle = RocksDBFactory.getOrCreateColumnFamily(TABLE_ID, INDEX_ID, VNODE_ID); } @AfterEach @@ -83,7 +90,7 @@ public void tearDown() throws SinglePointIndexException public void testPutEntry() throws RocksDBException, SinglePointIndexException { // Create Entry - byte[] key = "testPutEntry".getBytes(); + byte[] key = ByteBuffer.allocate(4).putInt(1).array(); long timestamp = 1000L; long rowId = 100L; @@ -97,7 +104,7 @@ public void testPutEntry() throws RocksDBException, SinglePointIndexException ByteBuffer valueBuffer = RocksDBThreadResources.getValueBuffer(); ReadOptions readOptions = new ReadOptions(); // Assert index has been written to rocksDB - int ret = rocksDB.get(readOptions, keyBuffer, valueBuffer); + int ret = rocksDB.get(columnFamilyHandle, readOptions, keyBuffer, valueBuffer); assertTrue(ret != RocksDB.NOT_FOUND); long storedRowId = valueBuffer.getLong(); @@ -181,42 +188,44 @@ public void testGetUniqueRowId() throws SinglePointIndexException @Test public void testSeekFindsNextVersionWithSameLogicalPrefix() throws Exception { - byte[] key = "testSeekSameLogicalPrefix".getBytes(); - long newerTimestamp = 1000L; - long olderTimestamp = newerTimestamp + 1000L; + // Use Default Prefix Len = 4 + byte[] key = ByteBuffer.allocate(4).putInt(7).array(); + long[] storedTimestamps = {1L, 3L, 5L, 7L, 9L}; + long[] seekTimestamps = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L}; + long[] expectedTimestamps = {1L, 1L, 3L, 3L, 5L, 5L, 7L, 7L, 9L, 9L}; - IndexProto.IndexKey newerKey = IndexProto.IndexKey.newBuilder() - .setIndexId(INDEX_ID) - .setKey(ByteString.copyFrom(key)) - .setTimestamp(newerTimestamp) - .build(); - IndexProto.IndexKey olderKey = IndexProto.IndexKey.newBuilder() - .setIndexId(INDEX_ID) - .setKey(ByteString.copyFrom(key)) - .setTimestamp(olderTimestamp) - .build(); - - uniqueIndex.putEntry(olderKey, 1L); - - ByteBuffer seekKey = toKeyBuffer(newerKey); - ByteBuffer expectedKey = toKeyBuffer(olderKey); + for (long timestamp : storedTimestamps) + { + IndexProto.IndexKey storedKey = IndexProto.IndexKey.newBuilder() + .setIndexId(INDEX_ID) + .setKey(ByteString.copyFrom(key)) + .setTimestamp(timestamp) + .build(); + uniqueIndex.putEntry(storedKey, timestamp); + } ReadOptions readOptions = RocksDBThreadResources.getReadOptions(); readOptions.setPrefixSameAsStart(true) .setTotalOrderSeek(false) .setVerifyChecksums(false); - String cfName = IndexUtils.getCFName(TABLE_ID, INDEX_ID, 0, MULTI_CF); - ColumnFamilyHandle columnFamilyHandle = RocksDBFactory.getAllCfHandles().get(cfName); - assertNotNull(columnFamilyHandle, "column family handle should exist"); - - try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions)) + for (int i = 0; i < seekTimestamps.length; i++) { - iterator.seek(seekKey); - assertTrue(iterator.isValid(), "seek should land on the next key with the same logical prefix"); - byte[] expectedBytes = new byte[expectedKey.remaining()]; - expectedKey.get(expectedBytes); - assertArrayEquals(expectedBytes, iterator.key()); + long seekTimestamp = seekTimestamps[i]; + long expectedTimestamp = expectedTimestamps[i]; + ByteBuffer seekKey = toKeyBuffer(indexKey(key, seekTimestamp)); + + try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions)) + { + iterator.seek(seekKey); + assertTrue(iterator.isValid(), "seek should find a version for timestamp " + seekTimestamp); + assertTrue(startsWith(ByteBuffer.wrap(iterator.key()), seekKey), + "seek should remain within the same logical prefix for timestamp " + seekTimestamp); + long getTs = extractTimestampFromUniqueKey(iterator.key()); + System.out.println("Timestamp: " + getTs); + assertEquals(expectedTimestamp, getTs, + "seek should land on the closest stored version for timestamp " + seekTimestamp); + } } } @@ -255,6 +264,21 @@ public void testGetRowIds() throws SinglePointIndexException assertTrue(rowIds.containsAll(result) && result.containsAll(rowIds), "getRowIds should return the rowId of all entries"); } + private static IndexProto.IndexKey indexKey(byte[] key, long timestamp) + { + return IndexProto.IndexKey.newBuilder() + .setIndexId(INDEX_ID) + .setKey(ByteString.copyFrom(key)) + .setTimestamp(timestamp) + .build(); + } + + private static long extractTimestampFromUniqueKey(byte[] encodedKey) + { + ByteBuffer keyBuffer = ByteBuffer.wrap(encodedKey); + return Long.MAX_VALUE - keyBuffer.getLong(encodedKey.length - Long.BYTES); + } + @Test public void testDeleteEntry() throws SinglePointIndexException {