Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ index.rocksdb.target.file.size.base=67108864
# rocksdb file size multiplier (default to 1)
index.rocksdb.target.file.size.multiplier=1
# rocksdb key fixed prefix length
index.rocksdb.prefix.length=12
index.rocksdb.prefix.length=4
# rocksdb max subcompactions
index.rocksdb.max.subcompactions=1
# rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION)
Expand All @@ -391,7 +391,7 @@ index.cache.capacity=10000000
# The expiration time (in seconds) of cache entries
index.cache.expiration.seconds=3600
# whether each index corresponds to its own column family
index.rocksdb.multicf=false
index.rocksdb.multicf=true
index.bucket.num=128
# the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file
index.sqlite.path=/tmp/sqlite
Expand Down
2 changes: 0 additions & 2 deletions pixels-index/pixels-index-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<dep.rocksdb.version>10.2.1</dep.rocksdb.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length"));
if (keyLen != null)
{
fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id
// Prefix must only cover the logical lookup key.
// It must not include the encoded timestamp suffix.
fixedLengthPrefix = keyLen + (multiCF ? 0 : Long.BYTES);
}
CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,52 @@
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
import io.pixelsdb.pixels.common.index.IndexOption;
import io.pixelsdb.pixels.common.index.SinglePointIndex;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.IndexUtils;
import io.pixelsdb.pixels.index.IndexProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toBuffer;
import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toKeyBuffer;
import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.startsWith;
import static org.junit.jupiter.api.Assertions.*;

public class TestRocksDBIndex
{
private static final boolean MULTI_CF =
Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf"));
private static final Logger log = LoggerFactory.getLogger(TestRocksDBIndex.class);
private RocksDB rocksDB;
private static final long TABLE_ID = 100L;
private static final long INDEX_ID = 100L;
private static final int VNODE_ID = 0;
private SinglePointIndex uniqueIndex;
private SinglePointIndex nonUniqueIndex;

private ColumnFamilyHandle columnFamilyHandle;
@BeforeEach
public void setUp() throws RocksDBException,SinglePointIndexException
{
IndexOption option = IndexOption.builder()
.vNodeId(0)
.vNodeId(VNODE_ID)
.build();
uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true, option);
nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false, option);
rocksDB = RocksDBFactory.getRocksDB();
columnFamilyHandle = RocksDBFactory.getOrCreateColumnFamily(TABLE_ID, INDEX_ID, VNODE_ID);
}

@AfterEach
Expand All @@ -77,7 +90,7 @@ public void tearDown() throws SinglePointIndexException
public void testPutEntry() throws RocksDBException, SinglePointIndexException
{
// Create Entry
byte[] key = "testPutEntry".getBytes();
byte[] key = ByteBuffer.allocate(4).putInt(1).array();
long timestamp = 1000L;
long rowId = 100L;

Expand All @@ -91,7 +104,7 @@ public void testPutEntry() throws RocksDBException, SinglePointIndexException
ByteBuffer valueBuffer = RocksDBThreadResources.getValueBuffer();
ReadOptions readOptions = new ReadOptions();
// Assert index has been written to rocksDB
int ret = rocksDB.get(readOptions, keyBuffer, valueBuffer);
int ret = rocksDB.get(columnFamilyHandle, readOptions, keyBuffer, valueBuffer);
assertTrue(ret != RocksDB.NOT_FOUND);

long storedRowId = valueBuffer.getLong();
Expand Down Expand Up @@ -172,6 +185,50 @@ public void testGetUniqueRowId() throws SinglePointIndexException
assertEquals(rowId2, result, "getUniqueRowId should return the rowId of the latest timestamp entry");
}

@Test
public void testSeekFindsNextVersionWithSameLogicalPrefix() throws Exception
{
// Use Default Prefix Len = 4
byte[] key = ByteBuffer.allocate(4).putInt(7).array();
long[] storedTimestamps = {1L, 3L, 5L, 7L, 9L};
long[] seekTimestamps = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L};
long[] expectedTimestamps = {1L, 1L, 3L, 3L, 5L, 5L, 7L, 7L, 9L, 9L};

for (long timestamp : storedTimestamps)
{
IndexProto.IndexKey storedKey = IndexProto.IndexKey.newBuilder()
.setIndexId(INDEX_ID)
.setKey(ByteString.copyFrom(key))
.setTimestamp(timestamp)
.build();
uniqueIndex.putEntry(storedKey, timestamp);
}

ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
readOptions.setPrefixSameAsStart(true)
.setTotalOrderSeek(false)
.setVerifyChecksums(false);

for (int i = 0; i < seekTimestamps.length; i++)
{
long seekTimestamp = seekTimestamps[i];
long expectedTimestamp = expectedTimestamps[i];
ByteBuffer seekKey = toKeyBuffer(indexKey(key, seekTimestamp));

try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions))
{
iterator.seek(seekKey);
assertTrue(iterator.isValid(), "seek should find a version for timestamp " + seekTimestamp);
assertTrue(startsWith(ByteBuffer.wrap(iterator.key()), seekKey),
"seek should remain within the same logical prefix for timestamp " + seekTimestamp);
long getTs = extractTimestampFromUniqueKey(iterator.key());
System.out.println("Timestamp: " + getTs);
assertEquals(expectedTimestamp, getTs,
"seek should land on the closest stored version for timestamp " + seekTimestamp);
}
}
}

@Test
public void testGetRowIds() throws SinglePointIndexException
{
Expand Down Expand Up @@ -207,6 +264,21 @@ public void testGetRowIds() throws SinglePointIndexException
assertTrue(rowIds.containsAll(result) && result.containsAll(rowIds), "getRowIds should return the rowId of all entries");
}

private static IndexProto.IndexKey indexKey(byte[] key, long timestamp)
{
return IndexProto.IndexKey.newBuilder()
.setIndexId(INDEX_ID)
.setKey(ByteString.copyFrom(key))
.setTimestamp(timestamp)
.build();
}

private static long extractTimestampFromUniqueKey(byte[] encodedKey)
{
ByteBuffer keyBuffer = ByteBuffer.wrap(encodedKey);
return Long.MAX_VALUE - keyBuffer.getLong(encodedKey.length - Long.BYTES);
}

@Test
public void testDeleteEntry() throws SinglePointIndexException
{
Expand Down Expand Up @@ -396,4 +468,4 @@ public void benchmarkDeleteEntry() throws SinglePointIndexException
double durationMs = (end - start) / 1_000_000.0;
System.out.printf("Deleted %,d entries in %.2f ms (%.2f ops/sec)%n", count, durationMs, count * 1000.0 / durationMs);
}
}
}
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@
<!-- slf4j-1.7.36 is compatible with 1.7.x that is widely used in third-party libraries -->
<dep.slf4j.version>1.7.36</dep.slf4j.version>

<dep.rocksdb.version>10.2.1</dep.rocksdb.version>

<!-- testing -->
<dep.junit4.version>4.13.2</dep.junit4.version>
<dep.junit.platform.version>1.8.2</dep.junit.platform.version>
Expand Down
Loading