Skip to content

Commit

Permalink
improve the RocksDBMapState.clear() via iterating the records directl…
Browse files Browse the repository at this point in the history
…y and deleting records using WriteBatch.
  • Loading branch information
sihuazhou committed May 10, 2018
1 parent 7c87c1a commit f0f533a
Showing 1 changed file with 32 additions and 19 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -225,11 +226,23 @@ public Map.Entry<UK, UV> next() {
@Override
public void clear() {
try {
Iterator<Map.Entry<UK, UV>> iterator = iterator();
try (RocksIterator iterator = backend.db.newIterator(columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {

while (iterator.hasNext()) {
iterator.next();
iterator.remove();
final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace();
iterator.seek(keyPrefixBytes);

while (iterator.isValid()) {
byte[] keyBytes = iterator.key();
if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
writeBatch.remove(columnFamily, keyBytes);
} else {
break;
}
iterator.next();
}

backend.db.write(writeOptions, writeBatch);
}
} catch (Exception e) {
LOG.warn("Error while cleaning the state.", e);
Expand Down Expand Up @@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSe
return isNull ? null : valueSerializer.deserialize(in);
}

private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
}

for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
if (rawKeyBytes[i] != keyPrefixBytes[i]) {
return false;
}
}

return true;
}

// ------------------------------------------------------------------------
// Internal Classes
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -572,7 +599,7 @@ private void loadCache() {
}

while (true) {
if (!iterator.isValid() || !underSameKey(iterator.key())) {
if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) {
expired = true;
break;
}
Expand All @@ -594,19 +621,5 @@ private void loadCache() {
}
}
}

private boolean underSameKey(byte[] rawKeyBytes) {
if (rawKeyBytes.length < keyPrefixBytes.length) {
return false;
}

for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) {
if (rawKeyBytes[i] != keyPrefixBytes[i]) {
return false;
}
}

return true;
}
}
}

0 comments on commit f0f533a

Please sign in to comment.