Skip to content

Commit

Permalink
benchmark.
Browse files Browse the repository at this point in the history
  • Loading branch information
sihuazhou committed May 17, 2018
1 parent f0f533a commit 75504ad
Showing 1 changed file with 313 additions and 0 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -33,14 +34,20 @@
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import sun.misc.Unsafe;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;

/**
* Test that validates that the performance of RocksDB is as expected.
Expand Down Expand Up @@ -186,6 +193,312 @@ public void testRocksDbRangeGetPerformance() throws Exception {
}
}

@Test
public void testWriteBatchVSPut() throws Exception {

System.out.println("---------> Batch VS Put <------------");

File rocksDirForBatch = tmp.newFolder();
File rocksDirForPut = tmp.newFolder();

final int num = 50000;

long batchCost;
long putCost;

try (RocksDB rocksDB = RocksDB.open(options, rocksDirForBatch.getAbsolutePath());
WriteBatch writeBatch = new WriteBatch()) {

final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;

log.info("begin insert");

final long beginInsert = System.nanoTime();
for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
writeBatch.put(keyTemplate, valueBytes);
if (i % 500 == 0) {
rocksDB.write(writeOptions, writeBatch);
writeBatch.clear();
}
}
rocksDB.write(writeOptions, writeBatch);
final long endInsert = System.nanoTime();
batchCost = endInsert - beginInsert;
log.info("BATCH: end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
System.out.println("BATCH: end insert - duration:" + (endInsert - beginInsert) / 1_000_000);
}

try (RocksDB rocksDB = RocksDB.open(options, rocksDirForPut.getAbsolutePath())) {

final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;

log.info("begin insert");

final long beginInsert = System.nanoTime();
for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
rocksDB.put(writeOptions, keyTemplate, valueBytes);
}
final long endInsert = System.nanoTime();
putCost = endInsert - beginInsert;
log.info("PUT: end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000);
System.out.println("PUT: end insert - duration:" + (endInsert - beginInsert) / 1_000_000);
}

Assert.assertTrue(batchCost < putCost);
}

@Test
public void testMapStateClearNewVSOld() throws Exception {

System.out.println("---------> MapState#Clear New VS Old <------------");

// the first comparison maybe not accuracy because of the setup reason.
long oldCost = testMapStateClearNewVSOld(50, false);
long newCost = testMapStateClearNewVSOld(50, true);

System.out.println("---->");
System.out.println("NEW: end delete " + 50 + " records - duration:" + newCost);
System.out.println("OLD: end delete " + 50 + " records - duration:" + oldCost);

oldCost = testMapStateClearNewVSOld(100, false);
newCost = testMapStateClearNewVSOld(100, true);

System.out.println("---->");
System.out.println("NEW: end delete " + 100 + " records - duration:" + newCost);
System.out.println("OLD: end delete " + 100 + " records - duration:" + oldCost);

oldCost = testMapStateClearNewVSOld(200, false);
newCost = testMapStateClearNewVSOld(200, true);

System.out.println("---->");
System.out.println("NEW: end delete " + 200 + " records - duration:" + newCost);
System.out.println("OLD: end delete " + 200 + " records - duration:" + oldCost);

oldCost = testMapStateClearNewVSOld(400, false);
newCost = testMapStateClearNewVSOld(400, true);

System.out.println("---->");
System.out.println("NEW: end delete " + 400 + " records - duration:" + newCost);
System.out.println("OLD: end delete " + 400 + " records - duration:" + oldCost);

oldCost = testMapStateClearNewVSOld(800, false);
newCost = testMapStateClearNewVSOld(800, true);

System.out.println("---->");
System.out.println("NEW: end delete " + 800 + " records - duration:" + newCost);
System.out.println("OLD: end delete " + 800 + " records - duration:" + oldCost);
}

public long testMapStateClearNewVSOld(int num, boolean isNew) throws Exception {

File rocksDirForBatch = tmp.newFolder();

try (RocksDB rocksDB = RocksDB.open(options, rocksDirForBatch.getAbsolutePath())) {

final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);

final Unsafe unsafe = MemoryUtils.UNSAFE;
final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;

for (int i = 0; i < num; i++) {
unsafe.putInt(keyTemplate, offset, i);
rocksDB.put(keyTemplate, valueBytes);
}

final long beginInsert = System.nanoTime();
if (isNew) {
try (RocksIterator iterator = rocksDB.newIterator();
WriteBatch writeBatch = new WriteBatch()) {
iterator.seek(keyBytes);
while (iterator.isValid()) {
if (samePrefix(keyBytes, iterator.key())) {
writeBatch.remove(iterator.key());
} else {
break;
}
iterator.next();
}
rocksDB.write(writeOptions, writeBatch);
}
} else {
RocksDBMapIterator iterator = new RocksDBMapIterator(rocksDB, keyBytes);
while (iterator.hasNext()) {
RocksDBMapEntry entry = iterator.next();
entry.remove();
}
}

return System.nanoTime() - beginInsert;
}
}


class RocksDBMapEntry implements Map.Entry<byte[], byte[]> {
private final RocksDB db;

/** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB
* with the format #KeyGroup#Key#Namespace#UserKey. */
private final byte[] rawKeyBytes;

/** The raw bytes of the value stored in RocksDB. */
private byte[] rawValueBytes;

private boolean deleted;

RocksDBMapEntry(
@Nonnull final RocksDB db,
@Nonnull final byte[] rawKeyBytes,
@Nonnull final byte[] rawValueBytes) {
this.db = db;

this.rawKeyBytes = rawKeyBytes;
this.rawValueBytes = rawValueBytes;
}

public void remove() {
rawValueBytes = null;

try {
db.delete(writeOptions, rawKeyBytes);
deleted = true;
} catch (RocksDBException e) {
throw new RuntimeException("Error while removing data from RocksDB.", e);
}
}

@Override
public byte[] getKey() {
return null;
}

@Override
public byte[] getValue() {
return null;
}

@Override
public byte[] setValue(byte[] value) {
return null;
}
}

public class RocksDBMapIterator implements Iterator<RocksDBMapEntry> {

private static final int CACHE_SIZE_LIMIT = 128;

private final RocksDB db;

private final byte[] keyPrefixBytes;

private boolean expired = false;

private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();

private int cacheIndex = 0;

RocksDBMapIterator(
final RocksDB db,
final byte[] keyPrefixBytes) {

this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
}

@Override
public boolean hasNext() {
loadCache();

return (cacheIndex < cacheEntries.size());
}

@Override
public RocksDBMapEntry next() {
loadCache();

if (cacheIndex == cacheEntries.size()) {
if (!expired) {
throw new IllegalStateException();
}

return null;
}

RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
cacheIndex++;

return entry;
}

@Override
public void remove() {
}

private void loadCache() {
if (cacheIndex > cacheEntries.size()) {
throw new IllegalStateException();
}

// Load cache entries only when the cache is empty and there still exist unread entries
if (cacheIndex < cacheEntries.size() || expired) {
return;
}

// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
try (RocksIterator iterator = db.newIterator()) {

/*
* The iteration starts from the prefix bytes at the first loading. The cache then is
* reloaded when the next entry to return is the last one in the cache. At that time,
* we will start the iterating from the last returned entry.
*/
RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);

cacheEntries.clear();
cacheIndex = 0;

iterator.seek(startBytes);

/*
* If the last returned entry is not deleted, it will be the first entry in the
* iterating. Skip it to avoid redundant access in such cases.
*/
if (lastEntry != null && !lastEntry.deleted) {
iterator.next();
}

while (true) {
if (!iterator.isValid() || !samePrefix(keyPrefixBytes, iterator.key())) {
expired = true;
break;
}

if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
break;
}

RocksDBMapEntry entry = new RocksDBMapEntry(
db,
iterator.key(),
iterator.value());

cacheEntries.add(entry);

iterator.next();
}
}
}
}

private static boolean samePrefix(byte[] prefix, byte[] key) {
for (int i = 0; i < prefix.length; i++) {
if (prefix[i] != key [i]) {
Expand Down

0 comments on commit 75504ad

Please sign in to comment.